Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: soft delete #622

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
40 changes: 34 additions & 6 deletions pkg/core/entity/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ const (

// ResourceGroup represents information required to locate a resource or multi resources.
type ResourceGroup struct {
Cluster string `json:"cluster,omitempty" yaml:"cluster,omitempty"`
APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"`
Kind string `json:"kind,omitempty" yaml:"kind,omitempty"`
Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty"`
Name string `json:"name,omitempty" yaml:"name,omitempty"`
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Cluster string `json:"cluster,omitempty" yaml:"cluster,omitempty"`
APIVersion string `json:"apiVersion,omitempty" yaml:"apiVersion,omitempty"`
Kind string `json:"kind,omitempty" yaml:"kind,omitempty"`
Namespace string `json:"namespace,omitempty" yaml:"namespace,omitempty"`
Name string `json:"name,omitempty" yaml:"name,omitempty"`
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"`
}

Expand Down Expand Up @@ -89,6 +89,34 @@ func (rg *ResourceGroup) Hash() ResourceGroupHash {
return ResourceGroupHash(hash.String())
}

// ToTerms converts the ResourceGroup to ES query terms.
func (rg *ResourceGroup) ToTerms() map[string]any {
terms := map[string]any{}

setIfNotEmpty := func(key string, val any) {
switch val := val.(type) {
case string:
if len(val) != 0 {
terms[key] = val
}
case map[string]string:
if len(val) != 0 {
terms[key] = val
}
}
}

setIfNotEmpty("cluster", rg.Cluster)
setIfNotEmpty("apiVersion", rg.APIVersion)
setIfNotEmpty("kind", rg.Kind)
setIfNotEmpty("namespace", rg.Namespace)
setIfNotEmpty("name", rg.Name)
setIfNotEmpty("labels", rg.Labels)
setIfNotEmpty("annotations", rg.Annotations)

return terms
}

// ToSQL generates a SQL query string based on the ResourceGroup.
func (rg *ResourceGroup) ToSQL() string {
conditions := []string{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/handler/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func SearchForResource(searchMgr *search.SearchManager, searchStorage storage.Se
rt.Items = append(rt.Items, search.UniResource{
Cluster: res.Cluster,
Object: obj,
SyncAt: res.SyncAt,
Deleted: res.Deleted,
})
}
rt.Total = res.Total
Expand Down
22 changes: 20 additions & 2 deletions pkg/core/manager/insight/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,38 @@ import (
"github.com/KusionStack/karpor/pkg/core/handler"
"github.com/KusionStack/karpor/pkg/infra/multicluster"
topologyutil "github.com/KusionStack/karpor/pkg/util/topology"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8syaml "sigs.k8s.io/yaml"
)

// GetResource returns the unstructured cluster object for a given cluster.
func (i *InsightManager) GetResource(
// getResource gets the resource from the cluster or storage.
func (i *InsightManager) getResource(
ctx context.Context, client *multicluster.MultiClusterClient, resourceGroup *entity.ResourceGroup,
) (*unstructured.Unstructured, error) {
resourceGVR, err := topologyutil.GetGVRFromGVK(resourceGroup.APIVersion, resourceGroup.Kind)
if err != nil {
return nil, err
}
resource, err := client.DynamicClient.Resource(resourceGVR).Namespace(resourceGroup.Namespace).Get(ctx, resourceGroup.Name, metav1.GetOptions{})

if err != nil && k8serrors.IsNotFound(err) {
if r, err := i.search.SearchByTerms(ctx, resourceGroup.ToTerms(), nil); err == nil && len(r.Resources) > 0 {
resource = &unstructured.Unstructured{}
resource.SetUnstructuredContent(r.Resources[0].Object)
return resource, nil
}
}

return resource, err
}

// GetResource returns the unstructured cluster object for a given cluster.
func (i *InsightManager) GetResource(
ctx context.Context, client *multicluster.MultiClusterClient, resourceGroup *entity.ResourceGroup,
) (*unstructured.Unstructured, error) {
resource, err := i.getResource(ctx, client, resourceGroup)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/manager/search/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func NewSearchManager() *SearchManager {
type UniResource struct {
Cluster string `json:"cluster"`
Object any `json:"object"`
SyncAt string `json:"syncAt"`
Deleted bool `json:"deleted"`
}

type UniResourceList struct {
Expand Down
21 changes: 21 additions & 0 deletions pkg/infra/persistence/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,27 @@ func (cl *Client) GetDocument(
return getResp.Source, nil
}

// UpdateDocument updates a document with the specified ID
func (cl *Client) UpdateDocument(
ctx context.Context,
indexName string,
documentID string,
body io.Reader,
) error {
resp, err := cl.client.Update(indexName, documentID, body, cl.client.Update.WithContext(ctx))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
return &ESError{
StatusCode: resp.StatusCode,
Message: resp.String(),
}
}
return nil
}

// DeleteDocument deletes a document with the specified ID
func (cl *Client) DeleteDocument(ctx context.Context, indexName string, documentID string) error {
if _, err := cl.GetDocument(ctx, indexName, documentID); err != nil {
Expand Down
40 changes: 40 additions & 0 deletions pkg/infra/search/storage/elasticsearch/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/KusionStack/karpor/pkg/infra/search/storage"
"github.com/elliotxx/esquery"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -39,6 +41,8 @@ const (
resourceKeyOwnerReferences = "ownerReferences"
resourceKeyResourceVersion = "resourceVersion"
resourceKeyContent = "content"
resourceKeySyncAt = "syncAt" // resource save/update/delete time
resourceKeyDeleted = "deleted" // indicates whether the resource is deleted in cluster
)

var ErrNotFound = fmt.Errorf("object not found")
Expand All @@ -52,6 +56,40 @@ func (s *Storage) SaveResource(ctx context.Context, cluster string, obj runtime.
return s.client.SaveDocument(ctx, s.resourceIndexName, id, bytes.NewReader(body))
}

// Refresh will update ES index. If you want the previous document changes to be
// searchable immediately, you need to call refresh manually.
//
// Refer to https://www.elastic.co/guide/en/elasticsearch/guide/current/near-real-time.html to see detail.
func (s *Storage) Refresh(ctx context.Context) error {
return s.client.Refresh(ctx, s.resourceIndexName)
}

// SoftDeleteResource only sets the deleted field to true, not really deletes the data in storage.
func (s *Storage) SoftDeleteResource(ctx context.Context, cluster string, obj runtime.Object) error {
unObj, ok := obj.(*unstructured.Unstructured)
if !ok {
// TODO: support other implement of runtime.Object
return fmt.Errorf("only support *unstructured.Unstructured type")
}

if err := s.GetResource(ctx, cluster, unObj); err != nil {
return err
}

body, err := json.Marshal(map[string]map[string]interface{}{
"doc": {
resourceKeySyncAt: time.Now(),
resourceKeyDeleted: true,
},
})
if err != nil {
return err
}

id := string(unObj.GetUID())
return s.client.UpdateDocument(ctx, s.resourceIndexName, id, bytes.NewReader(body))
}

// DeleteResource removes an object from the Elasticsearch storage for the specified cluster.
func (s *Storage) DeleteResource(ctx context.Context, cluster string, obj runtime.Object) error {
unObj, ok := obj.(*unstructured.Unstructured)
Expand Down Expand Up @@ -142,6 +180,8 @@ func (s *Storage) generateResourceDocument(cluster string, obj runtime.Object) (
resourceKeyOwnerReferences: metaObj.GetOwnerReferences(),
resourceKeyResourceVersion: metaObj.GetResourceVersion(),
resourceKeyContent: buf.String(),
resourceKeySyncAt: time.Now(),
resourceKeyDeleted: false,
})
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/infra/search/storage/elasticsearch/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Storage) searchByDSL(ctx context.Context, dslStr string, pagination *st

// searchBySQL performs a search operation using an SQL string and pagination settings.
func (s *Storage) searchBySQL(ctx context.Context, sqlStr string, pagination *storage.Pagination) (*storage.SearchResult, error) {
dsl, _, err := sql2es.Convert(sqlStr)
dsl, _, err := sql2es.ConvertWithDefaultFilter(sqlStr, &sql2es.DeletedFilter)
if err != nil {
return nil, err
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/infra/search/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type ResourceStorage interface {
DeleteResource(ctx context.Context, cluster string, obj runtime.Object) error
DeleteAllResources(ctx context.Context, cluster string) error
CountResources(ctx context.Context) (int, error)
SoftDeleteResource(ctx context.Context, cluster string, obj runtime.Object) error
Refresh(ctx context.Context) error
}

// ResourceGroupRuleStorage interface defines the basic operations for resource
Expand Down Expand Up @@ -162,6 +164,8 @@ func (r *SearchResult) ToYAML() (string, error) {
type Resource struct {
entity.ResourceGroup `json:",inline" yaml:",inline"`
Object map[string]interface{} `json:"object"`
SyncAt string `json:"syncAt,omitempty"`
Deleted bool `json:"deleted,omitempty"`
}

// NewResource creates a new Resource instance based on the provided bytes
Expand Down Expand Up @@ -204,6 +208,14 @@ func Map2Resource(in map[string]interface{}) (*Resource, error) {
out.Namespace = in["namespace"].(string)
out.Name = in["name"].(string)

// These two fields are newly added, so they don't exist in the old data.
if v, ok := in["syncAt"]; ok {
out.SyncAt = v.(string)
}
if v, ok := in["deleted"]; ok {
out.Deleted = v.(bool)
}

content := in["content"].(string)
obj := &unstructured.Unstructured{}
decoder := yamlutil.NewYAMLOrJSONDecoder(bytes.NewBufferString(content), len(content))
Expand Down
3 changes: 3 additions & 0 deletions pkg/kubernetes/apis/search/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type ResourceSyncRule struct {

// TrimRefName is the name of the TrimRule.
TrimRefName string `json:"trimRefName,omitempty"`

// RemainAfterDeleted indicates whether the resource should remain in ES after being deleted in k8s.
RemainAfterDeleted bool
}

// +genclient
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubernetes/apis/search/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ type ResourceSyncRule struct {
// TrimRefName is the name of the TrimRule.
// +optional
TrimRefName string `json:"trimRefName,omitempty"`

// RemainAfterDeleted indicates whether the resource should remain in ES after being deleted in k8s.
// +optional
RemainAfterDeleted bool `json:"remainAfterDeleted,omitempty"`
}

// +genclient
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/kubernetes/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions pkg/syncer/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/KusionStack/karpor/pkg/infra/search/storage"
"github.com/KusionStack/karpor/pkg/infra/search/storage/elasticsearch"
"github.com/KusionStack/karpor/pkg/kubernetes/apis/search/v1beta1"
"github.com/KusionStack/karpor/pkg/syncer/internal"
"github.com/KusionStack/karpor/pkg/syncer/jsonextracter"
Expand Down Expand Up @@ -164,7 +163,7 @@ func (s *informerSource) Stop(ctx context.Context) error {
}

// createInformer sets up and returns the informer and controller for the informerSource, using the provided context, event handler, workqueue, and predicates.
func (s *informerSource) createInformer(ctx context.Context, handler ctrlhandler.EventHandler, queue workqueue.RateLimitingInterface, predicates ...predicate.Predicate) (clientgocache.Store, clientgocache.Controller, error) {
func (s *informerSource) createInformer(_ context.Context, handler ctrlhandler.EventHandler, queue workqueue.RateLimitingInterface, predicates ...predicate.Predicate) (clientgocache.Store, clientgocache.Controller, error) {
gvr, err := parseGVR(&s.ResourceSyncRule)
if err != nil {
return nil, nil, errors.Wrap(err, "error parsing GroupVersionResource")
Expand Down Expand Up @@ -196,11 +195,6 @@ func (s *informerSource) createInformer(ctx context.Context, handler ctrlhandler

h := &internal.EventHandler{EventHandler: handler, Queue: queue, Predicates: predicates}
cache, informer := clientgocache.NewTransformingInformer(lw, &unstructured.Unstructured{}, resyncPeriod, h, trim)
// TODO: Use interface instead of struct
importer := utils.NewESImporter(s.storage.(*elasticsearch.Storage), s.cluster, gvr)
if err = importer.ImportTo(ctx, cache); err != nil {
return nil, nil, err
}
return cache, informer, nil
}

Expand Down
Loading
Loading