Skip to content

Commit

Permalink
locks on resource maps
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Jul 16, 2024
1 parent 697f447 commit 92592da
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 33 deletions.
40 changes: 18 additions & 22 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/Jeffail/gabs/v2"
Expand Down Expand Up @@ -36,9 +35,7 @@ import (

const ConfigTypePrefix = "Kubernetes::"

// TODO: this lock should be per scraper(cluster)
var allClustersResourceIDMap = map[string]ResourceIDMap{}
var clusterResourceIDLock sync.Mutex
var resourceIDMapPerCluster PerClusterResourceIDMap

// ReservedAnnotations
const (
Expand Down Expand Up @@ -231,7 +228,7 @@ func getObjectChangeExclusionAnnotations(ctx api.ScrapeContext, id string, exclu

// ExtractResults extracts scrape results from the given list of kuberenetes objects.
// - withCluster: if true, will create & add a scrape result for the kubernetes cluster.
func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstructured.Unstructured, withCluster bool) v1.ScrapeResults {
func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstructured.Unstructured, isIncrementalScrape bool) v1.ScrapeResults {
var (
results v1.ScrapeResults
changeResults v1.ScrapeResults
Expand All @@ -258,16 +255,17 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc
Tags: v1.Tags{{Name: "cluster", Value: config.ClusterName}},
}

resourceIDMap := getResourceIDsFromObjs(objs)
resourceIDMap := NewResourceIDMap(objs)
resourceIDMap.Set("", "Cluster", config.ClusterName, clusterID)
resourceIDMap.Set("", "Cluster", "selfRef", clusterID) // For shorthand

// For incremental scrape, we do not have all the data in the resource ID map
// we use it from the cached resource id map
clusterResourceIDLock.Lock()
resourceIDMap = mergeResourceIDMap(resourceIDMap, allClustersResourceIDMap[string(ctx.ScrapeConfig().GetUID())])
allClustersResourceIDMap[string(ctx.ScrapeConfig().GetUID())] = resourceIDMap
clusterResourceIDLock.Unlock()
if isIncrementalScrape {
// On incremental scrape, we do not have all the data in the resource ID map.
// we use it from the cached resource id map.
resourceIDMap.data = resourceIDMapPerCluster.MergeAndUpdate(string(ctx.ScrapeConfig().GetUID()), resourceIDMap.data)
} else {
resourceIDMapPerCluster.Swap(string(ctx.ScrapeConfig().GetUID()), resourceIDMap.data)
}

objChangeExclusionByType, objChangeExclusionBySeverity := formObjChangeExclusionMap(objs)

Expand Down Expand Up @@ -403,7 +401,7 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc
var nodeName string
if spec["nodeName"] != nil {
nodeName = spec["nodeName"].(string)
nodeID := resourceIDMap[""]["Node"][nodeName]
nodeID := resourceIDMap.Get("", "Node", nodeName)
nodeExternalID := lo.CoalesceOrEmpty(nodeID, getKubernetesAlias("Node", "", nodeName))

relationships = append(relationships, v1.RelationshipResult{
Expand Down Expand Up @@ -460,12 +458,10 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc

for _, ownerRef := range obj.GetOwnerReferences() {
relationships = append(relationships, v1.RelationshipResult{
ConfigID: string(ownerRef.UID),
RelatedConfigID: string(obj.GetUID()),
Relationship: ownerRef.Kind + obj.GetKind(),
}.WithConfig(
resourceIDMap.Get(obj.GetNamespace(), ownerRef.Kind, ownerRef.Name),
v1.ExternalID{ExternalID: []string{string(ownerRef.UID)}, ConfigType: getConfigTypePrefix(ownerRef.APIVersion) + ownerRef.Kind},
))
})
}

for _, f := range config.Relationships {
Expand Down Expand Up @@ -612,7 +608,7 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc
}

results = append(results, changeResults...)
if withCluster {
if isIncrementalScrape {
results = append([]v1.ScrapeResult{cluster}, results...)
}

Expand All @@ -633,15 +629,15 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc

// getKubernetesParent returns a list of potential parents in order.
// Example: For a Pod the parents would be [Replicaset, Namespace, Cluster]
func getKubernetesParent(obj *unstructured.Unstructured, exclusions v1.KubernetesExclusionConfig, resourceIDMap map[string]map[string]map[string]string) []v1.ConfigExternalKey {
func getKubernetesParent(obj *unstructured.Unstructured, exclusions v1.KubernetesExclusionConfig, resourceIDMap *ResourceIDMapContainer) []v1.ConfigExternalKey {
var allParents []v1.ConfigExternalKey
allParents = append(allParents, v1.ConfigExternalKey{
Type: ConfigTypePrefix + "Cluster",
ExternalID: resourceIDMap[""]["Cluster"]["selfRef"],
ExternalID: resourceIDMap.Get("", "Cluster", "selfRef"),
})

if obj.GetNamespace() != "" {
parentExternalID := resourceIDMap[""]["Namespace"][obj.GetNamespace()]
parentExternalID := resourceIDMap.Get("", "Namespace", obj.GetNamespace())
if parentExternalID == "" {
// An incremental scraper may not have the Namespace object.
// We can instead use the alias as the external id.
Expand All @@ -661,7 +657,7 @@ func getKubernetesParent(obj *unstructured.Unstructured, exclusions v1.Kubernete
// be its Deployment
if obj.GetKind() == "Pod" && lo.Contains(exclusions.Kinds, "ReplicaSet") && ref.Kind == "ReplicaSet" {
deployName := extractDeployNameFromReplicaSet(ref.Name)
parentExternalID := resourceIDMap[obj.GetNamespace()]["Deployment"][deployName]
parentExternalID := resourceIDMap.Get(obj.GetNamespace(), "Deployment", deployName)
allParents = append([]v1.ConfigExternalKey{{
Type: ConfigTypePrefix + "Deployment",
ExternalID: parentExternalID,
Expand Down
73 changes: 62 additions & 11 deletions scrapers/kubernetes/resource_map.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
package kubernetes

import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
import (
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

// map<namespace><kind><name>: <id>
type ResourceIDMap map[string]map[string]map[string]string

func (t ResourceIDMap) Set(namespace, kind, name, id string) {
if t[namespace] == nil {
t[namespace] = make(map[string]map[string]string)
type ResourceIDMapContainer struct {
mu sync.RWMutex
data ResourceIDMap
}

func (t *ResourceIDMapContainer) Set(namespace, kind, name, id string) {
t.mu.Lock()
defer t.mu.Unlock()

if t.data[namespace] == nil {
t.data[namespace] = make(map[string]map[string]string)
}
if t[namespace][kind] == nil {
t[namespace][kind] = make(map[string]string)
if t.data[namespace][kind] == nil {
t.data[namespace][kind] = make(map[string]string)
}
t[namespace][kind][name] = id
t.data[namespace][kind][name] = id
}

func (t ResourceIDMap) Get(namespace, kind, name string) string {
if kinds, ok := t[namespace]; ok {
func (t *ResourceIDMapContainer) Get(namespace, kind, name string) string {
t.mu.RLock()
defer t.mu.RUnlock()

if kinds, ok := t.data[namespace]; ok {
if names, ok := kinds[kind]; ok {
if id, ok := names[name]; ok {
return id
Expand All @@ -27,7 +42,40 @@ func (t ResourceIDMap) Get(namespace, kind, name string) string {
return ""
}

func getResourceIDsFromObjs(objs []*unstructured.Unstructured) ResourceIDMap {
type PerClusterResourceIDMap struct {
mu sync.Mutex
data map[string]ResourceIDMap
}

func (t *PerClusterResourceIDMap) Swap(clusterID string, resourceIDMap ResourceIDMap) {
t.mu.Lock()
defer t.mu.Unlock()

if t.data == nil {
t.data = make(map[string]ResourceIDMap)
}

t.data[clusterID] = resourceIDMap
}

func (t *PerClusterResourceIDMap) MergeAndUpdate(clusterID string, resourceIDMap ResourceIDMap) ResourceIDMap {
t.mu.Lock()
defer t.mu.Unlock()

cached, ok := t.data[clusterID]
if ok {
resourceIDMap = mergeResourceIDMap(resourceIDMap, cached)
}

if t.data == nil {
t.data = make(map[string]ResourceIDMap)
}

t.data[clusterID] = resourceIDMap
return resourceIDMap
}

func NewResourceIDMap(objs []*unstructured.Unstructured) *ResourceIDMapContainer {
resourceIDMap := make(map[string]map[string]map[string]string)
for _, obj := range objs {
if resourceIDMap[obj.GetNamespace()] == nil {
Expand All @@ -39,7 +87,10 @@ func getResourceIDsFromObjs(objs []*unstructured.Unstructured) ResourceIDMap {
resourceIDMap[obj.GetNamespace()][obj.GetKind()][obj.GetName()] = string(obj.GetUID())
}

return resourceIDMap
return &ResourceIDMapContainer{
data: resourceIDMap,
mu: sync.RWMutex{},
}
}

func mergeResourceIDMap(latest, cached ResourceIDMap) ResourceIDMap {
Expand Down

0 comments on commit 92592da

Please sign in to comment.