Skip to content

Commit

Permalink
compact: build and cache a fresh chunks index, see borgbackup#8397
Browse files Browse the repository at this point in the history
Uses ChunkIndex (a specialized, memory-efficient data
structure), so borg compact needs less memory now.

Also, compact builds an uptodate chunks index now,
which is saved to cache/chunks in the store, so every
borg can find it and fetch it from there instead of always
ad-hoc building the chunks index via repository.list(),
which can be rather slow.
  • Loading branch information
ThomasWaldmann committed Sep 22, 2024
1 parent 5274548 commit ab3813f
Showing 1 changed file with 76 additions and 61 deletions.
137 changes: 76 additions & 61 deletions src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
from typing import Tuple, Dict
import io
from typing import Tuple, Set

from ._common import with_repository
from ..archive import Archive
from ..constants import * # NOQA
from ..hashindex import ChunkIndex, ChunkIndexEntry
from ..helpers import set_ec, EXIT_WARNING, EXIT_ERROR, format_file_size, bin_to_hex
from ..helpers import ProgressIndicatorPercent
from ..manifest import Manifest
Expand All @@ -20,48 +22,80 @@ def __init__(self, repository, manifest):
self.repository = repository
assert isinstance(repository, (Repository, RemoteRepository))
self.manifest = manifest
self.repository_chunks = None # what we have in the repository, id -> stored_size
self.used_chunks = None # what archives currently reference
self.wanted_chunks = None # chunks that would be nice to have for next borg check --repair
self.chunks = None # a ChunkIndex, here used for: id -> (is_used, stored_size)
self.total_files = None # overall number of source files written to all archives in this repo
self.total_size = None # overall size of source file content data written to all archives
self.archives_count = None # number of archives

@property
def repository_size(self):
if self.repository_chunks is None:
if self.chunks is None:
return None
return sum(self.repository_chunks.values()) # sum of stored sizes
return sum(entry.size for id, entry in self.chunks.iteritems()) # sum of stored sizes

def garbage_collect(self):
"""Removes unused chunks from a repository."""
logger.info("Starting compaction / garbage collection...")
logger.info("Getting object IDs present in the repository...")
self.repository_chunks = self.get_repository_chunks()
self.chunks = self.get_repository_chunks()
logger.info("Computing object IDs used by archives...")
(self.used_chunks, self.wanted_chunks, self.total_files, self.total_size, self.archives_count) = (
(self.missing_chunks, self.reappeared_chunks, self.total_files, self.total_size, self.archives_count) = (
self.analyze_archives()
)
self.report_and_delete()
self.save_chunk_index()
logger.info("Finished compaction / garbage collection...")

def get_repository_chunks(self) -> Dict[bytes, int]:
def get_repository_chunks(self) -> ChunkIndex:
"""Build a dict id -> size of all chunks present in the repository"""
repository_chunks = {}
chunks = ChunkIndex()
marker = None
while True:
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
if not result:
break
marker = result[-1][0]
for id, stored_size in result:
repository_chunks[id] = stored_size
return repository_chunks

def analyze_archives(self) -> Tuple[Dict[bytes, int], Dict[bytes, int], int, int, int]:
# we add this id to the chunks index, using refcount == 0, because
# we do not know yet whether it is actually referenced from some archives.
# we "abuse" the size field here. usually there is the plaintext size,
# but we use it for the size of the stored object here.
chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size)
return chunks

def save_chunk_index(self):
# first clean up:
for id, entry in self.chunks.iteritems():
# we already deleted the unused chunks, so everything left must be used:
assert entry.refcount == ChunkIndex.MAX_VALUE
# as we put the wrong size in there, we need to clean up the size:
self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
# now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
self.chunks.compact() # vacuum the hash table
with io.BytesIO() as f:
self.chunks.write(f)
data = f.getvalue()
self.chunks.clear() # free memory, immediately
self.chunks = None
self.repository.store_store("cache/chunks", data)

def analyze_archives(self) -> Tuple[Set, Set, int, int, int]:
"""Iterate over all items in all archives, create the dicts id -> size of all used/wanted chunks."""
used_chunks = {} # chunks referenced by item.chunks
wanted_chunks = {} # additional "wanted" chunks seen in item.chunks_healthy

def use_it(id, *, wanted=False):
entry = self.chunks.get(id)
if entry is not None:
# the chunk is in the repo, mark it used by setting refcount to max.
self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=entry.size)
if wanted:
# chunk id is from chunks_healthy list: a lost chunk has re-appeared!
reappeared_chunks.add(id)
else:
# we do NOT have this chunk in the repository!
missing_chunks.add(id)

missing_chunks: set[bytes] = set()
reappeared_chunks: set[bytes] = set()
archive_infos = self.manifest.archives.list(sort_by=["ts"])
num_archives = len(archive_infos)
pi = ProgressIndicatorPercent(
Expand All @@ -73,79 +107,60 @@ def analyze_archives(self) -> Tuple[Dict[bytes, int], Dict[bytes, int], int, int
logger.info(f"Analyzing archive {info.name} {info.ts} {bin_to_hex(info.id)} ({i + 1}/{num_archives})")
archive = Archive(self.manifest, info.id)
# archive metadata size unknown, but usually small/irrelevant:
used_chunks[archive.id] = 0
use_it(archive.id)
for id in archive.metadata.item_ptrs:
used_chunks[id] = 0
use_it(id)
for id in archive.metadata.items:
used_chunks[id] = 0
use_it(id)
# archive items content data:
for item in archive.iter_items():
total_files += 1 # every fs object counts, not just regular files
if "chunks" in item:
for id, size in item.chunks:
total_size += size # original, uncompressed file content size
used_chunks[id] = size
use_it(id)
if "chunks_healthy" in item:
# we also consider the chunks_healthy chunks as referenced - do not throw away
# anything that borg check --repair might still need.
for id, size in item.chunks_healthy:
if id not in used_chunks:
wanted_chunks[id] = size
use_it(id, wanted=True)
pi.finish()
return used_chunks, wanted_chunks, total_files, total_size, num_archives
return missing_chunks, reappeared_chunks, total_files, total_size, num_archives

def report_and_delete(self):
run_repair = " Run borg check --repair!"

missing_new = set(self.used_chunks) - set(self.repository_chunks)
if missing_new:
logger.error(f"Repository has {len(missing_new)} new missing objects." + run_repair)
if self.missing_chunks:
logger.error(f"Repository has {len(self.missing_chunks)} missing objects." + run_repair)
set_ec(EXIT_ERROR)

missing_known = set(self.wanted_chunks) - set(self.repository_chunks)
if missing_known:
logger.warning(f"Repository has {len(missing_known)} known missing objects.")
set_ec(EXIT_WARNING)

missing_found = set(self.wanted_chunks) & set(self.repository_chunks)
if missing_found:
logger.warning(f"{len(missing_found)} previously missing objects re-appeared!" + run_repair)
if self.reappeared_chunks:
logger.warning(f"{len(self.reappeared_chunks)} previously missing objects re-appeared!" + run_repair)
set_ec(EXIT_WARNING)

repo_size_before = self.repository_size
referenced_chunks = set(self.used_chunks) | set(self.wanted_chunks)
unused = set(self.repository_chunks) - referenced_chunks
logger.info(f"Repository has {len(unused)} objects to delete.")
if unused:
logger.info(f"Deleting {len(unused)} unused objects...")
pi = ProgressIndicatorPercent(
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
)
for i, id in enumerate(unused):
pi.show(i)
self.repository.delete(id)
del self.repository_chunks[id]
pi.finish()
logger.info("Determining unused objects...")
unused = set()
for id, entry in self.chunks.iteritems():
if entry.refcount == 0:
unused.add(id)
logger.info(f"Deleting {len(unused)} unused objects...")
pi = ProgressIndicatorPercent(
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
)
for i, id in enumerate(unused):
pi.show(i)
self.repository.delete(id)
del self.chunks[id]
pi.finish()
repo_size_after = self.repository_size

count = len(self.repository_chunks)
count = len(self.chunks)
logger.info(f"Overall statistics, considering all {self.archives_count} archives in this repository:")
logger.info(
f"Source data size was {format_file_size(self.total_size, precision=0)} in {self.total_files} files."
)
dsize = 0
for id in self.repository_chunks:
if id in self.used_chunks:
dsize += self.used_chunks[id]
elif id in self.wanted_chunks:
dsize += self.wanted_chunks[id]
else:
raise KeyError(bin_to_hex(id))
logger.info(f"Repository size is {format_file_size(self.repository_size, precision=0)} in {count} objects.")
if self.total_size != 0:
logger.info(f"Space reduction factor due to deduplication: {dsize / self.total_size:.3f}")
if dsize != 0:
logger.info(f"Space reduction factor due to compression: {self.repository_size / dsize:.3f}")
logger.info(f"Repository size is {format_file_size(repo_size_after, precision=0)} in {count} objects.")
logger.info(f"Compaction saved {format_file_size(repo_size_before - repo_size_after, precision=0)}.")


Expand Down

0 comments on commit ab3813f

Please sign in to comment.