Skip to content

Commit

Permalink
Merge pull request #53 from deepghs/dev/arrange
Browse files Browse the repository at this point in the history
dev(narugo): add arrange system for uploading really tons of files
  • Loading branch information
narugo1992 authored Nov 27, 2024
2 parents 548234e + d2c9d8c commit 34e0876
Show file tree
Hide file tree
Showing 9 changed files with 970 additions and 1 deletion.
31 changes: 31 additions & 0 deletions docs/source/api_doc/utils/arrange.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
hfutils.utils.arrange
=================================

.. currentmodule:: hfutils.utils.arrange

.. automodule:: hfutils.utils.arrange


FileItem
---------------------------

.. autoclass:: FileItem
:members: file, size, count, from_file



FilesGroup
---------------------------

.. autoclass:: FilesGroup
:members: files, size, count, new, add



walk_files_with_groups
---------------------------

.. autofunction:: walk_files_with_groups



15 changes: 15 additions & 0 deletions docs/source/api_doc/utils/heap.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
hfutils.utils.heap
=================================

.. currentmodule:: hfutils.utils.heap

.. automodule:: hfutils.utils.heap


Heap
---------------------------

.. autoclass:: Heap
:members: __init__, pop, push, peak, __len__, __bool__, is_empty, __repr__


2 changes: 2 additions & 0 deletions docs/source/api_doc/utils/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ hfutils.utils
:maxdepth: 3

archive
arrange
binary
data
heap
download
model
number
Expand Down
2 changes: 2 additions & 0 deletions hfutils/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from .archive import is_archive_or_compressed
from .arrange import FilesGroup, FileItem, walk_files_with_groups
from .binary import is_binary_file
from .data import is_data_file
from .download import download_file
from .heap import Heap
from .logging import ColoredFormatter
from .number import number_to_tag
from .path import hf_normpath, hf_fs_path, parse_hf_fs_path, HfFileSystemPath
Expand Down
255 changes: 255 additions & 0 deletions hfutils/utils/arrange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
"""
A module for managing and grouping files based on size and structure.
This module provides functionality for walking through directories, grouping files based on
various criteria, and managing file collections with size constraints. It's particularly
useful for tasks involving file organization, batch processing, and storage management.
Example usage:
>>> groups = walk_files_with_groups("./data", pattern="*.txt", max_total_size="1GB")
>>> for group in groups:
... print(f"Group size: {group.size}, File count: {group.count}")
"""

import os
import pathlib
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Union, Optional

from hbutils.scale import size_to_bytes
from natsort import natsorted

from .heap import Heap
from .walk import walk_files


@dataclass
class FileItem:
"""
A data class representing a single file with its properties.
:param file: Path to the file
:type file: str
:param size: Size of the file in bytes
:type size: int
:param count: Number of files this item represents (typically 1)
:type count: int
"""

file: str
size: int
count: int

@classmethod
def from_file(cls, file: str, rel_to: Optional[str] = None) -> 'FileItem':
"""
Create a FileItem instance from a file path.
:param file: Path to the file
:type file: str
:param rel_to: Optional path to make the file path relative to
:type rel_to: Optional[str]
:return: A new FileItem instance
:rtype: FileItem
:raises FileNotFoundError: If the file does not exist
"""
file = pathlib.Path(file).resolve(strict=True)
size = os.path.getsize(str(file))
if rel_to:
rel_to = pathlib.Path(rel_to).resolve(strict=True)
file = file.relative_to(rel_to)

return cls(
file=str(file),
size=size,
count=1,
)


@dataclass
class FilesGroup:
"""
A data class representing a group of files with collective properties.
:param files: List of file paths in the group
:type files: List[str]
:param size: Total size of all files in the group
:type size: int
:param count: Total number of files in the group
:type count: int
"""

files: List[str]
size: int
count: int

@classmethod
def new(cls) -> 'FilesGroup':
"""
Create a new empty FilesGroup instance.
:return: A new empty FilesGroup
:rtype: FilesGroup
"""
return cls(
files=[],
size=0,
count=0,
)

def add(self, file: Union[FileItem, 'FilesGroup']) -> 'FilesGroup':
"""
Add a FileItem or another FilesGroup to this group.
:param file: The item to add to the group
:type file: Union[FileItem, FilesGroup]
:return: Self reference for method chaining
:rtype: FilesGroup
:raises TypeError: If the input type is not FileItem or FilesGroup
"""
if isinstance(file, FileItem):
self.files.append(file.file)
self.size += file.size
self.count += file.count
elif isinstance(file, FilesGroup):
self.files.extend(file.files)
self.size += file.size
self.count += file.count
else:
raise TypeError(f'Unknown type {type(file)!r} to add - {file!r}.')

return self


def _group_by_default(files: List[FileItem]) -> List[Union[FileItem, FilesGroup]]:
"""
Default grouping function that returns files as-is.
:param files: List of FileItem objects
:type files: List[FileItem]
:return: The same list of FileItems
:rtype: List[Union[FileItem, FilesGroup]]
"""
return files


def _group_by_segs(files: List[FileItem], segs: int) -> List[Union[FileItem, FilesGroup]]:
"""
Group files by their path segments.
:param files: List of FileItem objects
:type files: List[FileItem]
:param segs: Number of path segments to use for grouping
:type segs: int
:return: List of grouped files
:rtype: List[Union[FileItem, FilesGroup]]
"""
d = defaultdict(FilesGroup.new)
for file in files:
d[pathlib.Path(file.file).parts[:segs]].add(file)

retval = []
for key, value in natsorted(d.items()):
retval.append(value)
return retval


def _group_by(files: List[FileItem], group_method: Optional[Union[str, int]] = None) \
-> List[Union[FileItem, FilesGroup]]:
"""
Group files according to the specified method.
:param files: List of FileItem objects to group
:type files: List[FileItem]
:param group_method: Method for grouping (None for default, int for segment count)
:type group_method: Optional[Union[str, int]]
:return: List of grouped files
:rtype: List[Union[FileItem, FilesGroup]]
:raises TypeError: If group_method is of an unsupported type
:raises ValueError: If group_method is invalid or unsupported
"""
if isinstance(group_method, int):
pass # is an int
elif isinstance(group_method, str):
try:
group_method = int(group_method)
except (TypeError, ValueError):
pass # is a str
elif isinstance(group_method, type(None)):
pass # default policy
else:
raise TypeError(f'Unknown group by method - {group_method!r}.')

if isinstance(group_method, int) and group_method == 0:
raise ValueError('Unable to group by 0 segments.')

if group_method is None:
return _group_by_default(files)
elif isinstance(group_method, int):
return _group_by_segs(files, segs=group_method)
else:
raise ValueError(f'Unsupported group by method - {group_method!r}.')


def walk_files_with_groups(directory: str, pattern: Optional[str] = None,
group_method: Optional[Union[str, int]] = None,
max_total_size: Optional[Union[str, float]] = None) \
-> List[FilesGroup]:
"""
Walk through a directory and group files based on specified criteria.
This function walks through a directory, collecting files that match the given pattern,
and groups them according to the specified method while respecting size constraints.
:param directory: Root directory to start walking from
:type directory: str
:param pattern: Optional glob pattern to filter files
:type pattern: Optional[str]
:param group_method: Method for grouping files (None for default, int for segment count)
:type group_method: Optional[Union[str, int]]
:param max_total_size: Maximum total size for each group (can be string like "1GB")
:type max_total_size: Optional[Union[str, float]]
:return: List of file groups
:rtype: List[FilesGroup]
:raises ValueError: If the grouping parameters are invalid
:raises OSError: If there are filesystem-related errors
Example:
>>> groups = walk_files_with_groups("./data", "*.txt", group_method=2, max_total_size="1GB")
>>> for group in groups:
... print(f"Group contains {group.count} files, total size: {group.size} bytes")
"""
all_items = [
FileItem.from_file(os.path.join(directory, file), rel_to=directory)
for file in walk_files(directory, pattern=pattern)
]
if max_total_size is not None and isinstance(max_total_size, str):
max_total_size = size_to_bytes(max_total_size)
if max_total_size is None:
final_group = FilesGroup.new()
for file_item in all_items:
final_group.add(file_item)
return [final_group]

else:
raw_groups: List[Union[FileItem, FilesGroup]] = _group_by(all_items, group_method=group_method)
collected_groups: List[FilesGroup] = []
heap: Heap[FilesGroup] = Heap(key=lambda x: (x.size, x.count))
for group in raw_groups:
if not heap or (heap.peek().size + group.size) > max_total_size:
new_group = FilesGroup.new()
heap.push(new_group)
collected_groups.append(new_group)
item = heap.pop()
item.add(group)
heap.push(item)

collected_groups = [item for item in collected_groups if item.count > 0]
return collected_groups
Loading

0 comments on commit 34e0876

Please sign in to comment.