Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: GC on oci.Store.Delete #653

Merged
merged 26 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 86 additions & 20 deletions content/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/graph"
"oras.land/oras-go/v2/internal/resolver"
"oras.land/oras-go/v2/registry"
)

// Store implements `oras.Target`, and represents a content store
Expand All @@ -52,12 +53,24 @@ type Store struct {
// to manually call SaveIndex() when needed.
// - Default value: true.
AutoSaveIndex bool
root string
indexPath string
index *ocispec.Index
storage *Storage
tagResolver *resolver.Memory
graph *graph.Memory

// AutoGC controls if the OCI store will automatically clean newly produced
// dangling nodes during Delete() operation. Dangling nodes are those without
// any predecessors, such as blobs whose manifests have been deleted.
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
// - Default value: true.
AutoGC bool

// AutoRemoveReferrers controls if the OCI store will automatically delete its
// referrers when a manifest is deleted.
// - Default value: true.
AutoRemoveReferrers bool
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved

root string
indexPath string
index *ocispec.Index
storage *Storage
tagResolver *resolver.Memory
graph *graph.Memory

// sync ensures that most operations can be done concurrently, while Delete
// has the exclusive access to Store if a delete operation is underway. Operations
Expand All @@ -84,12 +97,14 @@ func NewWithContext(ctx context.Context, root string) (*Store, error) {
}

store := &Store{
AutoSaveIndex: true,
root: rootAbs,
indexPath: filepath.Join(rootAbs, ocispec.ImageIndexFile),
storage: storage,
tagResolver: resolver.NewMemory(),
graph: graph.NewMemory(),
AutoSaveIndex: true,
AutoGC: true,
AutoRemoveReferrers: true,
root: rootAbs,
indexPath: filepath.Join(rootAbs, ocispec.ImageIndexFile),
storage: storage,
tagResolver: resolver.NewMemory(),
graph: graph.NewMemory(),
}

if err := ensureDir(filepath.Join(rootAbs, ocispec.ImageBlobsDir)); err != nil {
Expand Down Expand Up @@ -143,11 +158,61 @@ func (s *Store) Exists(ctx context.Context, target ocispec.Descriptor) (bool, er

// Delete deletes the content matching the descriptor from the store. Delete may
// fail on certain systems (i.e. NTFS), if there is a process (i.e. an unclosed
// Reader) using target.
// Reader) using target. If s.AutoGC is set to true, Delete will recursively
// remove the dangling nodes caused by the current delete. If s.AutoRemoveReferrers
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
// is set to true, Delete will recursively remove the referrers of the manifests
// being deleted.
func (s *Store) Delete(ctx context.Context, target ocispec.Descriptor) error {
s.sync.Lock()
defer s.sync.Unlock()
var deleteQueue []ocispec.Descriptor
var alwaysDelete []bool
deleteQueue = append(deleteQueue, target)
alwaysDelete = append(alwaysDelete, true)

for len(deleteQueue) > 0 {
node := deleteQueue[0]
mustDelete := alwaysDelete[0]
deleteQueue = deleteQueue[1:]
alwaysDelete = alwaysDelete[1:]

// get referrers if applicable
var referrers []ocispec.Descriptor
if descriptor.IsManifest(node) && s.AutoRemoveReferrers {
res, err := registry.Referrers(ctx, s, node, "")
if err != nil {
return err
}
referrers = append(referrers, res...)
}
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
for _, ref := range referrers {
// referrers must be deleted if s.AutoRemoveReferrers is true
deleteQueue = append(deleteQueue, ref)
alwaysDelete = append(alwaysDelete, true)
}

// delete the head of queue if applicable
s.sync.Lock()
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
// do not delete existing manifests in tagResolver
_, err := s.tagResolver.Resolve(ctx, string(node.Digest))
if mustDelete || err == errdef.ErrNotFound {
danglings, err := s.delete(ctx, node)
if err != nil {
return err
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
}
if s.AutoGC {
deleteQueue = append(deleteQueue, danglings...)
alwaysDelete = append(alwaysDelete, false)
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
}
} else if err != nil {
return err
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
}
s.sync.Unlock()
}

return nil
}

// delete deletes one node and returns the dangling nodes caused by the delete.
func (s *Store) delete(ctx context.Context, target ocispec.Descriptor) ([]ocispec.Descriptor, error) {
resolvers := s.tagResolver.Map()
untagged := false
for reference, desc := range resolvers {
Expand All @@ -156,16 +221,17 @@ func (s *Store) Delete(ctx context.Context, target ocispec.Descriptor) error {
untagged = true
}
}
if err := s.graph.Remove(ctx, target); err != nil {
return err
}
danglings := s.graph.Remove(target)
if untagged && s.AutoSaveIndex {
err := s.saveIndex()
if err != nil {
return err
return nil, err
}
}
return s.storage.Delete(ctx, target)
if err := s.storage.Delete(ctx, target); err != nil {
return nil, err
}
return danglings, nil
}

// Tag tags a descriptor with a reference string.
Expand Down
155 changes: 155 additions & 0 deletions content/oci/oci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,7 @@ func TestStore_PredecessorsAndDelete(t *testing.T) {
if err != nil {
t.Fatal("New() error =", err)
}
s.AutoGC = false
ctx := context.Background()

// generate test content
Expand Down Expand Up @@ -2261,6 +2262,160 @@ func TestStore_PredecessorsAndDelete(t *testing.T) {
}
}

func TestStore_DeleteWithAutoGC(t *testing.T) {
tempDir := t.TempDir()
s, err := New(tempDir)
if err != nil {
t.Fatal("New() error =", err)
}
ctx := context.Background()

// generate test content
var blobs [][]byte
var descs []ocispec.Descriptor
appendBlob := func(mediaType string, blob []byte) {
blobs = append(blobs, blob)
descs = append(descs, ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(blob),
Size: int64(len(blob)),
})
}
generateManifest := func(config ocispec.Descriptor, subject *ocispec.Descriptor, layers ...ocispec.Descriptor) {
manifest := ocispec.Manifest{
Config: config,
Subject: subject,
Layers: layers,
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageManifest, manifestJSON)
}
generateIndex := func(manifests ...ocispec.Descriptor) {
index := ocispec.Index{
Manifests: manifests,
}
indexJSON, err := json.Marshal(index)
if err != nil {
t.Fatal(err)
}
appendBlob(ocispec.MediaTypeImageIndex, indexJSON)
}

appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0
appendBlob(ocispec.MediaTypeImageLayer, []byte("foo")) // Blob 1
appendBlob(ocispec.MediaTypeImageLayer, []byte("bar")) // Blob 2
appendBlob(ocispec.MediaTypeImageLayer, []byte("hello")) // Blob 3
generateManifest(descs[0], nil, descs[1]) // Blob 4
generateManifest(descs[0], nil, descs[2]) // Blob 5
generateManifest(descs[0], nil, descs[3]) // Blob 6
generateIndex(descs[4:6]...) // Blob 7
generateIndex(descs[6]) // Blob 8
appendBlob(ocispec.MediaTypeImageLayer, []byte("world")) // Blob 9
generateManifest(descs[0], &descs[6], descs[9]) // Blob 10
generateManifest(descs[0], &descs[10], descs[2]) // Blob 11

eg, egCtx := errgroup.WithContext(ctx)
for i := range blobs {
eg.Go(func(i int) func() error {
return func() error {
err := s.Push(egCtx, descs[i], bytes.NewReader(blobs[i]))
if err != nil {
return fmt.Errorf("failed to push test content to src: %d: %v", i, err)
}
return nil
}
}(i))
}
if err := eg.Wait(); err != nil {
t.Fatal(err)
}

// delete blob 4 and verify the result
if err := s.Delete(egCtx, descs[4]); err != nil {
t.Fatal(err)
}

// blob 1 and 4 are now deleted, and other blobs are still present
notPresent := []ocispec.Descriptor{descs[1], descs[4]}
for _, node := range notPresent {
if exists, _ := s.Exists(egCtx, node); exists {
t.Errorf("%v should not exist in store", node)
}
}
stillPresent := []ocispec.Descriptor{descs[0], descs[2], descs[3], descs[5], descs[6], descs[7], descs[8], descs[9], descs[10], descs[11]}
for _, node := range stillPresent {
if exists, _ := s.Exists(egCtx, node); !exists {
t.Errorf("%v should exist in store", node)
}
}

// delete blob 8 and verify the result
if err := s.Delete(egCtx, descs[8]); err != nil {
t.Fatal(err)
}

// blob 1, 4 and 8 are now deleted, and other blobs are still present
notPresent = []ocispec.Descriptor{descs[1], descs[4], descs[8]}
for _, node := range notPresent {
if exists, _ := s.Exists(egCtx, node); exists {
t.Errorf("%v should not exist in store", node)
}
}
stillPresent = []ocispec.Descriptor{descs[0], descs[2], descs[3], descs[5], descs[6], descs[7], descs[9], descs[10], descs[11]}
for _, node := range stillPresent {
if exists, _ := s.Exists(egCtx, node); !exists {
t.Errorf("%v should exist in store", node)
}
}

// delete blob 6 and verify the result
if err := s.Delete(egCtx, descs[6]); err != nil {
t.Fatal(err)
}

// blob 1, 3, 4, 6, 8, 9, 10, 11 are now deleted, and other blobs are still present
notPresent = []ocispec.Descriptor{descs[1], descs[3], descs[4], descs[6], descs[8], descs[9], descs[10], descs[11]}
for _, node := range notPresent {
if exists, _ := s.Exists(egCtx, node); exists {
t.Errorf("%v should not exist in store", node)
}
}
stillPresent = []ocispec.Descriptor{descs[0], descs[2], descs[5], descs[7]}
for _, node := range stillPresent {
if exists, _ := s.Exists(egCtx, node); !exists {
t.Errorf("%v should exist in store", node)
}
}

// verify predecessors information
wants := [][]ocispec.Descriptor{
{descs[5]}, // Blob 0
nil, // Blob 1
{descs[5]}, // Blob 2
nil, // Blob 3
{descs[7]}, // Blob 4's predecessor is descs[7], even though blob 4 no longer exist
{descs[7]}, // Blob 5
nil, // Blob 6
nil, // Blob 7
nil, // Blob 8
nil, // Blob 9
nil, // Blob 10
nil, // Blob 11
}
for i, want := range wants {
predecessors, err := s.Predecessors(ctx, descs[i])
if err != nil {
t.Errorf("Store.Predecessors(%d) error = %v", i, err)
}
if !equalDescriptorSet(predecessors, want) {
t.Errorf("Store.Predecessors(%d) = %v, want %v", i, predecessors, want)
}
}
}

func TestStore_Untag(t *testing.T) {
content := []byte("test delete")
desc := ocispec.Descriptor{
Expand Down
32 changes: 27 additions & 5 deletions internal/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,23 @@ import (

// Memory is a memory based PredecessorFinder.
type Memory struct {
nodes map[descriptor.Descriptor]ocispec.Descriptor // nodes saves the map keys of ocispec.Descriptor
// properties and behaviors of the fields:
// - Memory.nodes:
// 1. a node exists in Memory.nodes if and only if it exists in the memory
// 2. Memory.nodes saves the ocispec.Descriptor map keys, which are used by
// the other fields.
// - Memory.predecessors:
// 1. a node exists in Memory.predecessors if it has at least one predecessor
// in the memory, regardless of whether or not the node itself exists in
// the memory.
// 2. a node does not exist in Memory.predecessors, if it doesn't have any predecessors
// in the memory.
// - Memory.successors:
// 1. a node exists in Memory.successors if and only if it exists in the memory.
// 2. A node's entry in Memory.successors is always consistent with the actual
// content of the node, regardless of whether or not each successor exists
// in the memory.
wangxiaoxuan273 marked this conversation as resolved.
Show resolved Hide resolved
nodes map[descriptor.Descriptor]ocispec.Descriptor
predecessors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
successors map[descriptor.Descriptor]set.Set[descriptor.Descriptor]
lock sync.RWMutex
Expand Down Expand Up @@ -101,26 +117,32 @@ func (m *Memory) Predecessors(_ context.Context, node ocispec.Descriptor) ([]oci
return res, nil
}

// Remove removes the node from its predecessors and successors.
func (m *Memory) Remove(ctx context.Context, node ocispec.Descriptor) error {
// Remove removes the node from its predecessors and successors, and returns the
// dangling root nodes caused by the deletion.
func (m *Memory) Remove(node ocispec.Descriptor) []ocispec.Descriptor {
m.lock.Lock()
defer m.lock.Unlock()

nodeKey := descriptor.FromOCI(node)
var danglings []ocispec.Descriptor
// remove the node from its successors' predecessor list
for successorKey := range m.successors[nodeKey] {
predecessorEntry := m.predecessors[successorKey]
predecessorEntry.Delete(nodeKey)

// if none of the predecessors of the node still exists, we remove the
// predecessors entry. Otherwise, we do not remove the entry.
// predecessors entry and return it as a dangling node. Otherwise, we do
// not remove the entry.
if len(predecessorEntry) == 0 {
delete(m.predecessors, successorKey)
if _, exists := m.nodes[successorKey]; exists {
danglings = append(danglings, m.nodes[successorKey])
}
}
}
delete(m.successors, nodeKey)
delete(m.nodes, nodeKey)
return nil
return danglings
}

// index indexes predecessors for each direct successor of the given node.
Expand Down
Loading
Loading