From 6d055abeb75e00b15d97779c21d818afcd509263 Mon Sep 17 00:00:00 2001 From: Ismail Ashraq Date: Tue, 21 May 2024 15:54:26 +0500 Subject: [PATCH 1/3] process large docs in batches --- semantic_chunkers/chunkers/statistical.py | 36 +++++++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/semantic_chunkers/chunkers/statistical.py b/semantic_chunkers/chunkers/statistical.py index a6997ba..67a82f5 100644 --- a/semantic_chunkers/chunkers/statistical.py +++ b/semantic_chunkers/chunkers/statistical.py @@ -9,6 +9,8 @@ from semantic_chunkers.utils.text import tiktoken_length from semantic_chunkers.utils.logger import logger +from tqdm.auto import tqdm + @dataclass class ChunkStatistics: @@ -62,7 +64,7 @@ def __init__( self.enable_statistics = enable_statistics self.statistics: ChunkStatistics - def __call__(self, docs: List[str]) -> List[List[Chunk]]: + def __call__(self, docs: List[str], batch_size: int = 500) -> List[List[Chunk]]: """Chunk documents into smaller chunks based on semantic similarity. :param docs: list of text documents to be split, if only wanted to @@ -75,6 +77,8 @@ def __call__(self, docs: List[str]) -> List[List[Chunk]]: all_chunks = [] + new_docs = [] + # Split the docs that already exceed max_split_tokens to smaller chunks for doc in docs: token_count = tiktoken_length(doc) if token_count > self.max_split_tokens: @@ -83,22 +87,42 @@ def __call__(self, docs: List[str]) -> List[List[Chunk]]: f"of {self.max_split_tokens}. " "Splitting to sentences before semantically merging." ) - splits = self._split(doc) - encoded_splits = self._encode_documents(splits) + splits = self._split(doc) + new_docs.extend(splits) + else: + new_docs.append(doc) + + docs = [doc for doc in new_docs if doc and doc.strip()] + + last_split = None + for i in tqdm(range(0, len(docs), batch_size), desc="Processing document batches"): + batch_docs = docs[i:i + batch_size] + if last_split is not None: + batch_docs = last_split.splits + batch_docs + + encoded_splits = self._encode_documents(batch_docs) similarities = self._calculate_similarity_scores(encoded_splits) if self.dynamic_threshold: - self._find_optimal_threshold(splits, similarities) + self._find_optimal_threshold(batch_docs, similarities) else: self.calculated_threshold = self.encoder.score_threshold split_indices = self._find_split_indices(similarities=similarities) - doc_chunks = self._split_documents(splits, split_indices, similarities) + doc_chunks = self._split_documents(batch_docs, split_indices, similarities) + + if len(doc_chunks) > 1: + all_chunks.extend(doc_chunks[:-1]) + last_split = doc_chunks[-1] + else: + last_split = doc_chunks[0] if self.plot_chunks: self.plot_similarity_scores(similarities, split_indices, doc_chunks) if self.enable_statistics: print(self.statistics) - all_chunks.append(doc_chunks) + + if last_split: + all_chunks.append(last_split) return all_chunks From 283a9b044c516c65e75d896089c2612ab0185512 Mon Sep 17 00:00:00 2001 From: Ismail Ashraq Date: Tue, 21 May 2024 16:27:30 +0500 Subject: [PATCH 2/3] linting --- semantic_chunkers/chunkers/statistical.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/semantic_chunkers/chunkers/statistical.py b/semantic_chunkers/chunkers/statistical.py index 67a82f5..c5a8269 100644 --- a/semantic_chunkers/chunkers/statistical.py +++ b/semantic_chunkers/chunkers/statistical.py @@ -78,7 +78,7 @@ def __call__(self, docs: List[str], batch_size: int = 500) -> List[List[Chunk]]: all_chunks = [] new_docs = [] - # Split the docs that already exceed max_split_tokens to smaller chunks + # Split the docs that already exceed max_split_tokens to smaller chunks for doc in docs: token_count = tiktoken_length(doc) if token_count > self.max_split_tokens: @@ -92,11 +92,13 @@ def __call__(self, docs: List[str], batch_size: int = 500) -> List[List[Chunk]]: else: new_docs.append(doc) - docs = [doc for doc in new_docs if doc and doc.strip()] + docs = [doc for doc in new_docs if doc and doc.strip()] last_split = None - for i in tqdm(range(0, len(docs), batch_size), desc="Processing document batches"): - batch_docs = docs[i:i + batch_size] + for i in tqdm( + range(0, len(docs), batch_size), desc="Processing document batches" + ): + batch_docs = docs[i : i + batch_size] if last_split is not None: batch_docs = last_split.splits + batch_docs @@ -120,7 +122,7 @@ def __call__(self, docs: List[str], batch_size: int = 500) -> List[List[Chunk]]: if self.enable_statistics: print(self.statistics) - + if last_split: all_chunks.append(last_split) From da4e594a5bfe8bd0673861a0b89ae0cb4e01177a Mon Sep 17 00:00:00 2001 From: Ismail Ashraq Date: Sun, 26 May 2024 13:17:02 +0500 Subject: [PATCH 3/3] implement _chunk method --- semantic_chunkers/chunkers/statistical.py | 98 +++++++++++++++-------- 1 file changed, 63 insertions(+), 35 deletions(-) diff --git a/semantic_chunkers/chunkers/statistical.py b/semantic_chunkers/chunkers/statistical.py index c5a8269..7cf9f9b 100644 --- a/semantic_chunkers/chunkers/statistical.py +++ b/semantic_chunkers/chunkers/statistical.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import List +from typing import Any, List import numpy as np @@ -64,55 +64,55 @@ def __init__( self.enable_statistics = enable_statistics self.statistics: ChunkStatistics - def __call__(self, docs: List[str], batch_size: int = 500) -> List[List[Chunk]]: - """Chunk documents into smaller chunks based on semantic similarity. + def _chunk( + self, splits: List[Any], batch_size: int = 64, enforce_max_tokens: bool = False + ) -> List[Chunk]: + """Merge splits into chunks using semantic similarity, with optional enforcement of maximum token limits per chunk. - :param docs: list of text documents to be split, if only wanted to - split a single document, pass it as a list with a single element. + :param splits: Splits to be merged into chunks. + :param batch_size: Number of splits to process in one batch. + :param enforce_max_tokens: If True, further split chunks that exceed the maximum token limit. - :return: list of DocumentChunk objects containing the split documents. + :return: List of chunks. """ - if not docs: - raise ValueError("At least one document is required for splitting.") - - all_chunks = [] - - new_docs = [] # Split the docs that already exceed max_split_tokens to smaller chunks - for doc in docs: - token_count = tiktoken_length(doc) - if token_count > self.max_split_tokens: - logger.info( - f"Single document exceeds the maximum token limit " - f"of {self.max_split_tokens}. " - "Splitting to sentences before semantically merging." - ) - splits = self._split(doc) - new_docs.extend(splits) - else: - new_docs.append(doc) + if enforce_max_tokens: + new_splits = [] + for split in splits: + token_count = tiktoken_length(split) + if token_count > self.max_split_tokens: + logger.info( + f"Single document exceeds the maximum token limit " + f"of {self.max_split_tokens}. " + "Splitting to sentences before semantically merging." + ) + _splits = self._split(split) + new_splits.extend(_splits) + else: + new_splits.append(split) - docs = [doc for doc in new_docs if doc and doc.strip()] + splits = [split for split in new_splits if split and split.strip()] + chunks = [] last_split = None - for i in tqdm( - range(0, len(docs), batch_size), desc="Processing document batches" - ): - batch_docs = docs[i : i + batch_size] + for i in tqdm(range(0, len(splits), batch_size)): + batch_splits = splits[i : i + batch_size] if last_split is not None: - batch_docs = last_split.splits + batch_docs + batch_splits = last_split.splits + batch_splits - encoded_splits = self._encode_documents(batch_docs) + encoded_splits = self._encode_documents(batch_splits) similarities = self._calculate_similarity_scores(encoded_splits) if self.dynamic_threshold: - self._find_optimal_threshold(batch_docs, similarities) + self._find_optimal_threshold(batch_splits, similarities) else: self.calculated_threshold = self.encoder.score_threshold split_indices = self._find_split_indices(similarities=similarities) - doc_chunks = self._split_documents(batch_docs, split_indices, similarities) + doc_chunks = self._split_documents( + batch_splits, split_indices, similarities + ) if len(doc_chunks) > 1: - all_chunks.extend(doc_chunks[:-1]) + chunks.extend(doc_chunks[:-1]) last_split = doc_chunks[-1] else: last_split = doc_chunks[0] @@ -124,8 +124,36 @@ def __call__(self, docs: List[str], batch_size: int = 500) -> List[List[Chunk]]: print(self.statistics) if last_split: - all_chunks.append(last_split) + chunks.append(last_split) + return chunks + + def __call__(self, docs: List[str], batch_size: int = 64) -> List[List[Chunk]]: + """Split documents into smaller chunks based on semantic similarity. + + :param docs: list of text documents to be split, if only wanted to + split a single document, pass it as a list with a single element. + + :return: list of Chunk objects containing the split documents. + """ + if not docs: + raise ValueError("At least one document is required for splitting.") + + all_chunks = [] + for doc in docs: + token_count = tiktoken_length(doc) + if token_count > self.max_split_tokens: + logger.info( + f"Single document exceeds the maximum token limit " + f"of {self.max_split_tokens}. " + "Splitting to sentences before semantically merging." + ) + if isinstance(doc, str): + splits = self._split(doc) + doc_chunks = self._chunk(splits, batch_size=batch_size) + all_chunks.append(doc_chunks) + else: + raise ValueError("The document must be a string.") return all_chunks def _encode_documents(self, docs: List[str]) -> np.ndarray: