Skip to content

Commit

Permalink
dev(narugo): complete this part
Browse files Browse the repository at this point in the history
  • Loading branch information
narugo1992 committed Nov 27, 2024
1 parent 2bc860b commit 32b8e65
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 107 deletions.
8 changes: 8 additions & 0 deletions docs/source/api_doc/archive/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,11 @@ get_archive_extname



archive_splitext
----------------------------------------

.. autofunction:: archive_splitext




180 changes: 97 additions & 83 deletions hfutils/archive/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
"""
This module provides functionality for handling archive files in various formats.
It includes functions for registering custom archive types, packing directories into archives,
unpacking archives, and determining archive types based on file extensions. The module supports
a flexible system for working with different archive formats through a registration mechanism.
Archive handling module for managing different types of archive files.
.. note::
This module uses a global dictionary to store registered archive types, so it's
Expand All @@ -20,13 +16,18 @@

class ArchiveWriter:
"""
A base class for creating archive writers.
Base class for creating and managing archive writers.
This class provides a context manager interface for handling archive files,
allowing files to be added to the archive and ensuring proper resource management.
allowing for safe resource management and consistent file addition operations.
It serves as a template for specific archive format implementations.
:param archive_file: The path to the archive file to be created or modified.
:param archive_file: Path to the archive file to be created or modified.
:type archive_file: str
Example:
>>> with ArchiveWriter('output.zip') as writer:
... writer.add('file.txt', 'archive_path/file.txt')
"""

def __init__(self, archive_file: str):
Expand All @@ -35,35 +36,36 @@ def __init__(self, archive_file: str):

def _create_handler(self):
"""
Create the handler for the archive writer.
Create the underlying archive handler.
This method should be overridden by subclasses to provide specific
handler creation logic for different archive types.
This method should be implemented by subclasses to initialize the
specific archive format handler.
:raises NotImplementedError: If not overridden in a subclass.
:raises NotImplementedError: When called on the base class.
"""
raise NotImplementedError # pragma: no cover

def _add_file(self, filename: str, arcname: str):
"""
Add a file to the archive.
This method should be overridden by subclasses to define how files
are added to the archive for different formats.
This method should be implemented by subclasses to define the
specific file addition logic for each archive format.
:param filename: The path to the file to add to the archive.
:param filename: Path to the file to be added.
:type filename: str
:param arcname: The archive name for the file.
:param arcname: Desired path within the archive.
:type arcname: str
:raises NotImplementedError: If not overridden in a subclass.
:raises NotImplementedError: When called on the base class.
"""
raise NotImplementedError # pragma: no cover

def open(self):
"""
Open the archive for writing.
Initializes the handler if it has not been created yet.
Initializes the archive handler if it hasn't been created yet.
This method is automatically called when using the context manager.
"""
if self._handler is None:
self._handler = self._create_handler()
Expand All @@ -72,37 +74,43 @@ def add(self, filename: str, arcname: str):
"""
Add a file to the archive.
:param filename: The path to the file to add.
:param filename: Path to the file to be added.
:type filename: str
:param arcname: The name to use for the file within the archive.
:param arcname: Desired path within the archive.
:type arcname: str
"""
return self._add_file(filename, arcname)

def close(self):
"""
Close the archive.
Close the archive and release resources.
Ensures that all resources are properly released.
This method ensures proper cleanup of resources and is automatically
called when using the context manager.
"""
if self._handler is not None:
self._handler.close()
self._handler = None

def __enter__(self):
"""
Enter the runtime context related to this object.
Context manager entry point.
Opens the archive for writing.
:return: Self reference for use in context manager.
:rtype: ArchiveWriter
"""
self.open()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit the runtime context related to this object.
Context manager exit point.
Closes the archive, ensuring that resources are released.
Ensures proper cleanup of resources when exiting the context.
:param exc_type: Exception type if an error occurred.
:param exc_val: Exception value if an error occurred.
:param exc_tb: Exception traceback if an error occurred.
"""
self.close()

Expand All @@ -113,31 +121,28 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def register_archive_type(name: str, exts: List[str], fn_pack: Callable, fn_unpack: Callable, fn_writer: _FN_WRITER):
"""
Register a custom archive type with associated file extensions and packing/unpacking functions.
Register a new archive type with its associated handlers and extensions.
This function allows users to add support for new archive types by providing the necessary
information and functions to handle the archive format.
This function allows for the registration of custom archive formats by providing
the necessary functions for packing, unpacking, and creating archive writers.
:param name: The name of the archive type (e.g., 'zip', 'tar').
:param name: Identifier for the archive type (e.g., 'zip', 'tar').
:type name: str
:param exts: A list of file extensions associated with the archive type (e.g., ['.zip']).
:param exts: List of file extensions for this archive type (e.g., ['.zip']).
:type exts: List[str]
:param fn_pack: The packing function that takes a directory and an archive filename as input and creates an archive.
:param fn_pack: Function to create archives of this type.
:type fn_pack: Callable
:param fn_unpack: The unpacking function that takes an archive filename and a directory as input and extracts the archive.
:param fn_unpack: Function to extract archives of this type.
:type fn_unpack: Callable
:param fn_writer: The writer creation function that takes an archive filename and creates an archive writer object.
:param fn_writer: Function to create an archive writer instance.
:type fn_writer: Callable[[str], ArchiveWriter]
:raises ValueError: If no file extensions are provided for the archive type.
:raises ValueError: If no file extensions are provided.
Example:
>>> def custom_pack(directory, archive_file, **kwargs):
... # Custom packing logic here
... pass
>>> def custom_unpack(archive_file, directory, **kwargs):
... # Custom unpacking logic here
... pass
>>> register_archive_type('custom', ['.cst'], custom_pack, custom_unpack)
>>> def my_pack(directory, archive_file, **kwargs): pass
>>> def my_unpack(archive_file, directory, **kwargs): pass
>>> def my_writer(archive_file): return CustomWriter(archive_file)
>>> register_archive_type('custom', ['.cst'], my_pack, my_unpack, my_writer)
"""
if len(exts) == 0:
raise ValueError(f'At least one extension name for archive type {name!r} should be provided.')
Expand All @@ -146,18 +151,17 @@ def register_archive_type(name: str, exts: List[str], fn_pack: Callable, fn_unpa

def get_archive_extname(type_name: str) -> str:
"""
Get the file extension associated with a registered archive type.
This function returns the first (primary) file extension associated with the given archive type.
Retrieve the primary file extension for a registered archive type.
:param type_name: The name of the archive type.
:param type_name: Name of the archive type.
:type type_name: str
:return: The file extension associated with the archive type.
:return: Primary file extension for the archive type.
:rtype: str
:raises ValueError: If the archive type is not registered.
Example:
>>> get_archive_extname('zip')
>>> ext = get_archive_extname('zip')
>>> print(ext)
'.zip'
"""
if type_name in _KNOWN_ARCHIVE_TYPES:
Expand All @@ -170,26 +174,24 @@ def get_archive_extname(type_name: str) -> str:
def archive_pack(type_name: str, directory: str, archive_file: str,
pattern: Optional[str] = None, silent: bool = False, clear: bool = False):
"""
Pack a directory into an archive file using the specified archive type.
Create an archive from a directory using the specified archive type.
This function creates an archive of the specified type containing the contents of the given directory.
:param type_name: The name of the archive type.
:param type_name: Name of the archive type to use.
:type type_name: str
:param directory: The directory to pack.
:param directory: Source directory to archive.
:type directory: str
:param archive_file: The filename of the resulting archive.
:param archive_file: Output archive file path.
:type archive_file: str
:param pattern: A pattern to filter files for inclusion in the archive (optional).
:param pattern: Optional file pattern for filtering (e.g., '*.txt').
:type pattern: str, optional
:param silent: If True, suppress warnings during the packing process.
:param silent: Whether to suppress warnings.
:type silent: bool
:param clear: If True, remove existing files when packing.
:param clear: Whether to remove existing files when packing.
:type clear: bool
:raises ValueError: If the archive type is not registered.
Example:
>>> archive_pack('zip', '/path/to/directory', '/path/to/archive.zip', pattern='*.txt')
>>> archive_pack('zip', '/data', 'backup.zip', pattern='*.dat', silent=True)
"""
exts, fn_pack, _, _ = _KNOWN_ARCHIVE_TYPES[type_name]
if not any(os.path.normcase(archive_file).endswith(extname) for extname in exts):
Expand All @@ -202,19 +204,17 @@ def archive_pack(type_name: str, directory: str, archive_file: str,

def get_archive_type(archive_file: str) -> str:
"""
Determine the archive type based on the file extension.
This function examines the file extension of the given archive file and returns the
corresponding archive type name.
Determine the archive type from a file's extension.
:param archive_file: The filename of the archive.
:param archive_file: Path to the archive file.
:type archive_file: str
:return: The name of the archive type.
:return: Name of the detected archive type.
:rtype: str
:raises ValueError: If the file extension is not associated with any registered archive type.
:raises ValueError: If the file extension doesn't match any registered type.
Example:
>>> get_archive_type('/path/to/archive.tar.gz')
>>> type_name = get_archive_type('data.tar.gz')
>>> print(type_name)
'gztar'
"""
archive_file = os.path.normcase(archive_file)
Expand All @@ -227,22 +227,20 @@ def get_archive_type(archive_file: str) -> str:

def archive_unpack(archive_file: str, directory: str, silent: bool = False, password: Optional[str] = None):
"""
Unpack an archive file into a directory using the specified archive type.
Extract an archive file to a directory.
This function extracts the contents of the given archive file into the specified directory.
:param archive_file: The filename of the archive.
:param archive_file: Path to the archive file to extract.
:type archive_file: str
:param directory: The directory to unpack the contents into.
:param directory: Destination directory for extraction.
:type directory: str
:param silent: If True, suppress warnings during the unpacking process.
:param silent: Whether to suppress warnings.
:type silent: bool
:param password: The password to extract the archive file (optional).
:param password: Optional password for protected archives.
:type password: str, optional
:raises ValueError: If the archive type is not recognized.
Example:
>>> archive_unpack('/path/to/archive.zip', '/path/to/extract')
>>> archive_unpack('protected.zip', 'output_dir', password='secret')
"""
type_name = get_archive_type(archive_file)
_, _, fn_unpack, _ = _KNOWN_ARCHIVE_TYPES[type_name]
Expand All @@ -251,22 +249,19 @@ def archive_unpack(archive_file: str, directory: str, silent: bool = False, pass

def archive_writer(type_name: str, archive_file: str) -> ArchiveWriter:
"""
Create an ArchiveWriter instance for the specified archive type.
This function returns an ArchiveWriter that can be used to add files to an archive.
Create an archive writer for the specified archive type.
:param type_name: The name of the archive type.
:param type_name: Name of the archive type.
:type type_name: str
:param archive_file: The filename of the archive to be created or modified.
:param archive_file: Path to the archive file to create.
:type archive_file: str
:return: An ArchiveWriter instance for the specified archive type.
:return: An archive writer instance.
:rtype: ArchiveWriter
:raises ValueError: If the archive type is not registered.
Example:
>>> writer = archive_writer('zip', '/path/to/archive.zip')
>>> with writer as w:
... w.add('/path/to/file.txt', 'file.txt')
>>> with archive_writer('zip', 'output.zip') as writer:
... writer.add('file.txt', 'docs/file.txt')
"""
exts, _, _, fn_writer = _KNOWN_ARCHIVE_TYPES[type_name]
if not any(os.path.normcase(archive_file).endswith(extname) for extname in exts):
Expand All @@ -279,11 +274,30 @@ def archive_writer(type_name: str, archive_file: str) -> ArchiveWriter:

@lru_cache()
def _get_all_extensions():
"""
Get a list of all registered archive extensions.
:return: List of all registered file extensions.
:rtype: list
"""
extensions = []
for type_name, (exts, _, _, _) in _KNOWN_ARCHIVE_TYPES.items():
extensions.extend(exts)
return extensions

Check warning on line 286 in hfutils/archive/base.py

View check run for this annotation

Codecov / codecov/patch

hfutils/archive/base.py#L283-L286

Added lines #L283 - L286 were not covered by tests


def archive_splitext(filename: str) -> Tuple[str, str]:
"""
Split a filename into root and extension, handling compound extensions.
:param filename: The filename to split.
:type filename: str
:return: Tuple of (root, extension).
:rtype: Tuple[str, str]
Example:
>>> root, ext = archive_splitext('data.tar.gz')
>>> print(root, ext)
'data' '.tar.gz'
"""
return splitext_with_composite(filename, _get_all_extensions())

Check warning on line 303 in hfutils/archive/base.py

View check run for this annotation

Codecov / codecov/patch

hfutils/archive/base.py#L303

Added line #L303 was not covered by tests
Loading

0 comments on commit 32b8e65

Please sign in to comment.