Skip to content

Commit

Permalink
Merge pull request #54 from deepghs/dev/archive
Browse files Browse the repository at this point in the history
dev(narugo): add archive writer system
  • Loading branch information
narugo1992 authored Nov 27, 2024
2 parents 34e0876 + 2229f57 commit ae33c69
Show file tree
Hide file tree
Showing 213 changed files with 515 additions and 83 deletions.
15 changes: 15 additions & 0 deletions docs/source/api_doc/archive/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ archive_unpack



archive_writer
----------------------------------------

.. autofunction:: archive_writer



ArchiveWriter
----------------------------------------

.. autoclass:: ArchiveWriter
:members: __init__, open, add, close, __enter__, __exit__



get_archive_type
----------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion docs/source/api_doc/archive/supported_types.demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
if __name__ == '__main__':
columns = ['Format', 'Extension Name']
rows = []
for key, (exts, _, _) in sorted(_KNOWN_ARCHIVE_TYPES.items()):
for key, (exts, _, _, _) in sorted(_KNOWN_ARCHIVE_TYPES.items()):
rows.append((key, ', '.join(f'``{v}``' for v in exts)))

df = pd.DataFrame(columns=columns, data=rows)
Expand Down
11 changes: 6 additions & 5 deletions hfutils/archive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
.. warning::
The creation of archive files in the RAR format is not supported, as we utilize the `rarfile <https://github.com/markokr/rarfile>`_ library, which does not offer functionality for creating RAR files.
"""
from .base import register_archive_type, archive_pack, archive_unpack, get_archive_type, get_archive_extname
from .rar import _rar_pack, _rar_unpack
from .sevenz import _7z_pack, _7z_unpack
from .tar import _tarfile_pack, _tarfile_unpack
from .zip import _zip_pack, _zip_unpack
from .base import register_archive_type, archive_pack, archive_unpack, get_archive_type, get_archive_extname, \
archive_writer, ArchiveWriter
from .rar import _rar_pack, _rar_unpack, RARWriter
from .sevenz import _7z_pack, _7z_unpack, SevenZWriter
from .tar import _tarfile_pack, _tarfile_unpack, TarWriter
from .zip import _zip_pack, _zip_unpack, ZipWriter
135 changes: 128 additions & 7 deletions hfutils/archive/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,101 @@
import warnings
from typing import List, Dict, Tuple, Callable, Optional

_KNOWN_ARCHIVE_TYPES: Dict[str, Tuple[List[str], Callable, Callable]] = {}

class ArchiveWriter:
"""
A base class for creating archive writers.
This class provides a context manager interface for handling archive files,
allowing files to be added to the archive and ensuring proper resource management.
:param archive_file: The path to the archive file to be created or modified.
:type archive_file: str
"""

def __init__(self, archive_file: str):
self.archive_file = archive_file
self._handler = None

def _create_handler(self):
"""
Create the handler for the archive writer.
This method should be overridden by subclasses to provide specific
handler creation logic for different archive types.
:raises NotImplementedError: If not overridden in a subclass.
"""
raise NotImplementedError # pragma: no cover

def _add_file(self, filename: str, arcname: str):
"""
Add a file to the archive.
This method should be overridden by subclasses to define how files
are added to the archive for different formats.
:param filename: The path to the file to add to the archive.
:type filename: str
:param arcname: The archive name for the file.
:type arcname: str
:raises NotImplementedError: If not overridden in a subclass.
"""
raise NotImplementedError # pragma: no cover

def open(self):
"""
Open the archive for writing.
Initializes the handler if it has not been created yet.
"""
if self._handler is None:
self._handler = self._create_handler()

def add(self, filename: str, arcname: str):
"""
Add a file to the archive.
:param filename: The path to the file to add.
:type filename: str
:param arcname: The name to use for the file within the archive.
:type arcname: str
"""
return self._add_file(filename, arcname)

def close(self):
"""
Close the archive.
Ensures that all resources are properly released.
"""
if self._handler is not None:
self._handler.close()
self._handler = None

def __enter__(self):
"""
Enter the runtime context related to this object.
Opens the archive for writing.
"""
self.open()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit the runtime context related to this object.
def register_archive_type(name: str, exts: List[str], fn_pack: Callable, fn_unpack: Callable):
Closes the archive, ensuring that resources are released.
"""
self.close()


_FN_WRITER = Callable[[str], ArchiveWriter]
_KNOWN_ARCHIVE_TYPES: Dict[str, Tuple[List[str], Callable, Callable, _FN_WRITER]] = {}


def register_archive_type(name: str, exts: List[str], fn_pack: Callable, fn_unpack: Callable, fn_writer: _FN_WRITER):
"""
Register a custom archive type with associated file extensions and packing/unpacking functions.
Expand All @@ -32,6 +123,8 @@ def register_archive_type(name: str, exts: List[str], fn_pack: Callable, fn_unpa
:type fn_pack: Callable
:param fn_unpack: The unpacking function that takes an archive filename and a directory as input and extracts the archive.
:type fn_unpack: Callable
:param fn_writer: The writer creation function that takes an archive filename and creates an archive writer object.
:type fn_writer: Callable[[str], ArchiveWriter]
:raises ValueError: If no file extensions are provided for the archive type.
Example:
Expand All @@ -45,7 +138,7 @@ def register_archive_type(name: str, exts: List[str], fn_pack: Callable, fn_unpa
"""
if len(exts) == 0:
raise ValueError(f'At least one extension name for archive type {name!r} should be provided.')
_KNOWN_ARCHIVE_TYPES[name] = (exts, fn_pack, fn_unpack)
_KNOWN_ARCHIVE_TYPES[name] = (exts, fn_pack, fn_unpack, fn_writer)


def get_archive_extname(type_name: str) -> str:
Expand All @@ -65,7 +158,7 @@ def get_archive_extname(type_name: str) -> str:
'.zip'
"""
if type_name in _KNOWN_ARCHIVE_TYPES:
exts, _, _ = _KNOWN_ARCHIVE_TYPES[type_name]
exts, _, _, _ = _KNOWN_ARCHIVE_TYPES[type_name]
return exts[0]
else:
raise ValueError(f'Unknown archive type - {type_name!r}.')
Expand Down Expand Up @@ -95,7 +188,7 @@ def archive_pack(type_name: str, directory: str, archive_file: str,
Example:
>>> archive_pack('zip', '/path/to/directory', '/path/to/archive.zip', pattern='*.txt')
"""
exts, fn_pack, _ = _KNOWN_ARCHIVE_TYPES[type_name]
exts, fn_pack, _, _ = _KNOWN_ARCHIVE_TYPES[type_name]
if not any(os.path.normcase(archive_file).endswith(extname) for extname in exts):
warnings.warn(f'The archive type {type_name!r} should be one of the {exts!r}, '
f'but file name {archive_file!r} is assigned. '
Expand All @@ -122,7 +215,7 @@ def get_archive_type(archive_file: str) -> str:
'gztar'
"""
archive_file = os.path.normcase(archive_file)
for type_name, (exts, _, _) in _KNOWN_ARCHIVE_TYPES.items():
for type_name, (exts, _, _, _) in _KNOWN_ARCHIVE_TYPES.items():
if any(archive_file.endswith(extname) for extname in exts):
return type_name

Expand All @@ -149,5 +242,33 @@ def archive_unpack(archive_file: str, directory: str, silent: bool = False, pass
>>> archive_unpack('/path/to/archive.zip', '/path/to/extract')
"""
type_name = get_archive_type(archive_file)
_, _, fn_unpack = _KNOWN_ARCHIVE_TYPES[type_name]
_, _, fn_unpack, _ = _KNOWN_ARCHIVE_TYPES[type_name]
return fn_unpack(archive_file, directory, silent=silent, password=password)


def archive_writer(type_name: str, archive_file: str) -> ArchiveWriter:
"""
Create an ArchiveWriter instance for the specified archive type.
This function returns an ArchiveWriter that can be used to add files to an archive.
:param type_name: The name of the archive type.
:type type_name: str
:param archive_file: The filename of the archive to be created or modified.
:type archive_file: str
:return: An ArchiveWriter instance for the specified archive type.
:rtype: ArchiveWriter
:raises ValueError: If the archive type is not registered.
Example:
>>> writer = archive_writer('zip', '/path/to/archive.zip')
>>> with writer as w:
... w.add('/path/to/file.txt', 'file.txt')
"""
exts, _, _, fn_writer = _KNOWN_ARCHIVE_TYPES[type_name]
if not any(os.path.normcase(archive_file).endswith(extname) for extname in exts):
warnings.warn(f'The archive type {type_name!r} should be one of the {exts!r}, '
f'but file name {archive_file!r} is assigned. '
f'We strongly recommend using a regular extension name for the archive file.')

return fn_writer(archive_file)
59 changes: 57 additions & 2 deletions hfutils/archive/rar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,69 @@
import os
from typing import Optional

from .base import register_archive_type
from .base import register_archive_type, ArchiveWriter

try:
import rarfile
except ImportError: # pragma: no cover
rarfile = None


class RARWriter(ArchiveWriter):
"""
A placeholder class for RAR archive writing operations.
This class inherits from ArchiveWriter but does not implement actual RAR writing
functionality as it is not supported.
:param archive_file: Path to the RAR archive file.
:type archive_file: str
:raises RuntimeError: Always raised as RAR writing is not supported.
"""

def __init__(self, archive_file: str):
"""
Initialize the RAR writer.
:param archive_file: Path to the RAR archive file.
:type archive_file: str
:raises RuntimeError: Always raised as RAR writing is not supported.
"""
super().__init__(archive_file)
raise RuntimeError('RAR format writing is not supported.')

def _create_handler(self):
"""
Placeholder for creating a RAR file handler.
:raises NotImplementedError: Always raised as RAR writing is not supported.
"""
raise NotImplementedError # pragma: no cover

def _add_file(self, filename: str, arcname: str):
"""
Placeholder for adding a file to the RAR archive.
:param filename: Path to the file to add.
:type filename: str
:param arcname: Name to give the file in the archive.
:type arcname: str
:raises NotImplementedError: Always raised as RAR writing is not supported.
"""
raise NotImplementedError # pragma: no cover


def _rar_pack(directory, zip_file, pattern: Optional[str] = None, silent: bool = False, clear: bool = False):
"""
Placeholder function for RAR packing (not supported).
This function exists for API completeness but is not implemented as RAR
packing is not supported by the underlying library.
:param directory: The directory to pack.
:type directory: str or os.PathLike
:param zip_file: The output RAR file.
:type zip_file: str or os.PathLike
:param pattern: Optional pattern for file selection.
:type pattern: str, optional
:param silent: If True, suppress output. Defaults to False.
Expand All @@ -41,8 +90,14 @@ def _rar_unpack(rar_file, directory, silent: bool = False, password: Optional[st
"""
Unpack a RAR file to a specified directory.
This function extracts all contents of a RAR archive to the specified directory.
It supports password-protected archives and will create the target directory
if it doesn't exist.
:param rar_file: The RAR file to unpack.
:type rar_file: str or os.PathLike
:param directory: The directory to unpack the RAR file into.
:type directory: str or os.PathLike
:param silent: If True, suppress output. Defaults to False.
:type silent: bool
:param password: Optional password for encrypted RAR files.
Expand All @@ -58,4 +113,4 @@ def _rar_unpack(rar_file, directory, silent: bool = False, password: Optional[st


if rarfile is not None:
register_archive_type('rar', ['.rar'], _rar_pack, _rar_unpack)
register_archive_type('rar', ['.rar'], _rar_pack, _rar_unpack, RARWriter)
Loading

0 comments on commit ae33c69

Please sign in to comment.