Skip to content

Commit

Permalink
dev(narugo): save the arrange code
Browse files Browse the repository at this point in the history
  • Loading branch information
narugo1992 committed Nov 27, 2024
1 parent 8a3c435 commit f22c6e0
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 1 deletion.
31 changes: 31 additions & 0 deletions docs/source/api_doc/utils/arrange.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
hfutils.utils.arrange
=================================

.. currentmodule:: hfutils.utils.arrange

.. automodule:: hfutils.utils.arrange


FileItem
---------------------------

.. autoclass:: FileItem
:members: file, size, count, from_file



FilesGroup
---------------------------

.. autoclass:: FilesGroup
:members: files, size, count, new, add



walk_files_with_groups
---------------------------

.. autofunction:: walk_files_with_groups



1 change: 1 addition & 0 deletions docs/source/api_doc/utils/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ hfutils.utils
:maxdepth: 3

archive
arrange
binary
data
heap
Expand Down
126 changes: 125 additions & 1 deletion hfutils/utils/arrange.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
"""
A module for managing and grouping files based on size and structure.
This module provides functionality for walking through directories, grouping files based on
various criteria, and managing file collections with size constraints. It's particularly
useful for tasks involving file organization, batch processing, and storage management.
Example usage:
>>> groups = walk_files_with_groups("./data", pattern="*.txt", max_total_size="1GB")
>>> for group in groups:
... print(f"Group size: {group.size}, File count: {group.count}")
"""

import os
import pathlib
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Union, Optional

from hbutils.scale import size_to_bytes
from natsort import natsorted

from .heap import Heap
Expand All @@ -12,12 +26,35 @@

@dataclass
class FileItem:
"""
A data class representing a single file with its properties.
:param file: Path to the file
:type file: str
:param size: Size of the file in bytes
:type size: int
:param count: Number of files this item represents (typically 1)
:type count: int
"""

file: str
size: int
count: int

@classmethod
def from_file(cls, file: str, rel_to: Optional[str] = None) -> 'FileItem':
"""
Create a FileItem instance from a file path.
:param file: Path to the file
:type file: str
:param rel_to: Optional path to make the file path relative to
:type rel_to: Optional[str]
:return: A new FileItem instance
:rtype: FileItem
:raises FileNotFoundError: If the file does not exist
"""
file = pathlib.Path(file).resolve(strict=True)
size = os.path.getsize(str(file))
if rel_to:
Expand All @@ -33,19 +70,46 @@ def from_file(cls, file: str, rel_to: Optional[str] = None) -> 'FileItem':

@dataclass
class FilesGroup:
"""
A data class representing a group of files with collective properties.
:param files: List of file paths in the group
:type files: List[str]
:param size: Total size of all files in the group
:type size: int
:param count: Total number of files in the group
:type count: int
"""

files: List[str]
size: int
count: int

@classmethod
def new(cls) -> 'FilesGroup':
"""
Create a new empty FilesGroup instance.
:return: A new empty FilesGroup
:rtype: FilesGroup
"""
return cls(
files=[],
size=0,
count=0,
)

def add(self, file: Union[FileItem, 'FilesGroup']) -> 'FilesGroup':
"""
Add a FileItem or another FilesGroup to this group.
:param file: The item to add to the group
:type file: Union[FileItem, FilesGroup]
:return: Self reference for method chaining
:rtype: FilesGroup
:raises TypeError: If the input type is not FileItem or FilesGroup
"""
if isinstance(file, FileItem):
self.files.append(file.file)
self.size += file.size
Expand All @@ -61,10 +125,30 @@ def add(self, file: Union[FileItem, 'FilesGroup']) -> 'FilesGroup':


def _group_by_default(files: List[FileItem]) -> List[Union[FileItem, FilesGroup]]:
"""
Default grouping function that returns files as-is.
:param files: List of FileItem objects
:type files: List[FileItem]
:return: The same list of FileItems
:rtype: List[Union[FileItem, FilesGroup]]
"""
return files


def _group_by_segs(files: List[FileItem], segs: int) -> List[Union[FileItem, FilesGroup]]:
"""
Group files by their path segments.
:param files: List of FileItem objects
:type files: List[FileItem]
:param segs: Number of path segments to use for grouping
:type segs: int
:return: List of grouped files
:rtype: List[Union[FileItem, FilesGroup]]
"""
d = defaultdict(FilesGroup.new)
for file in files:
d[pathlib.Path(file.file).parts[:segs]].add(file)
Expand All @@ -77,6 +161,19 @@ def _group_by_segs(files: List[FileItem], segs: int) -> List[Union[FileItem, Fil

def _group_by(files: List[FileItem], group_method: Optional[Union[str, int]] = None) \
-> List[Union[FileItem, FilesGroup]]:
"""
Group files according to the specified method.
:param files: List of FileItem objects to group
:type files: List[FileItem]
:param group_method: Method for grouping (None for default, int for segment count)
:type group_method: Optional[Union[str, int]]
:return: List of grouped files
:rtype: List[Union[FileItem, FilesGroup]]
:raises TypeError: If group_method is of an unsupported type
:raises ValueError: If group_method is invalid or unsupported
"""
if isinstance(group_method, int):
pass # is an int
elif isinstance(group_method, str):
Expand All @@ -102,12 +199,39 @@ def _group_by(files: List[FileItem], group_method: Optional[Union[str, int]] = N

def walk_files_with_groups(directory: str, pattern: Optional[str] = None,
group_method: Optional[Union[str, int]] = None,
max_total_size: Optional[float] = None) \
max_total_size: Optional[Union[str, float]] = None) \
-> List[FilesGroup]:
"""
Walk through a directory and group files based on specified criteria.
This function walks through a directory, collecting files that match the given pattern,
and groups them according to the specified method while respecting size constraints.
:param directory: Root directory to start walking from
:type directory: str
:param pattern: Optional glob pattern to filter files
:type pattern: Optional[str]
:param group_method: Method for grouping files (None for default, int for segment count)
:type group_method: Optional[Union[str, int]]
:param max_total_size: Maximum total size for each group (can be string like "1GB")
:type max_total_size: Optional[Union[str, float]]
:return: List of file groups
:rtype: List[FilesGroup]
:raises ValueError: If the grouping parameters are invalid
:raises OSError: If there are filesystem-related errors
Example:
>>> groups = walk_files_with_groups("./data", "*.txt", group_method=2, max_total_size="1GB")
>>> for group in groups:
... print(f"Group contains {group.count} files, total size: {group.size} bytes")
"""
all_items = [
FileItem.from_file(os.path.join(directory, file), rel_to=directory)
for file in walk_files(directory, pattern=pattern)
]
if max_total_size is not None and isinstance(max_total_size, str):
max_total_size = size_to_bytes(max_total_size)
if max_total_size is None:
final_group = FilesGroup.new()
for file_item in all_items:
Expand Down
14 changes: 14 additions & 0 deletions test/utils/test_arrange.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import tempfile

import pytest
from hbutils.scale import size_to_bytes

from hfutils.utils import walk_files, FileItem, FilesGroup, walk_files_with_groups

Expand Down Expand Up @@ -231,6 +232,19 @@ def test_group_by_size_threshold(self, complex_directory):
original_files = len(list(walk_files(complex_directory)))
assert total_files == original_files

def test_group_by_size_threshold_size_str(self, complex_directory):
# Test grouping with size threshold around 5000 bytes
# Should separate large files from medium and small ones
result = walk_files_with_groups(complex_directory, max_total_size='5kb')

assert len(result) > 1 # Should have multiple groups due to size limit
assert all(group.size <= size_to_bytes('5kb') or group.count == 1 for group in result)

# Verify total file count
total_files = sum(group.count for group in result)
original_files = len(list(walk_files(complex_directory)))
assert total_files == original_files

def test_group_by_directory_depth(self, complex_directory):
# Test grouping by directory depth (2 levels)
result = walk_files_with_groups(complex_directory, group_method=2, max_total_size=10000)
Expand Down
File renamed without changes.

0 comments on commit f22c6e0

Please sign in to comment.