Skip to content

Commit

Permalink
Roman/dry ingest pipeline step (#3203)
Browse files Browse the repository at this point in the history
### Description
The main goal of this was to reduce the duplicate code that was being
written for each ingest pipeline step to support async and not async
functionality.

Additional bug fixes found and fixed:
* each logger for ingest wasn't being instantiated correctly. This was
fixed to instantiate in the beginning of a pipeline run as soon as the
verbosity level can be determined.
* The `requires_dependencies` wrapper wasn't wrapping async functions
correctly. This was fixed so that `asyncio.iscoroutinefunction()` gets
trigger correctly.
  • Loading branch information
rbiseck3 authored Jun 14, 2024
1 parent 29e64eb commit a6c09ec
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 154 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.14.6-dev6
## 0.14.6-dev7

### Enhancements

Expand Down
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.6-dev6" # pragma: no cover
__version__ = "0.14.6-dev7" # pragma: no cover
4 changes: 3 additions & 1 deletion unstructured/ingest/v2/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")
Pipeline.from_configs(
context=ProcessorConfig(work_dir=str(work_dir.resolve()), tqdm=True),
context=ProcessorConfig(
work_dir=str(work_dir.resolve()), tqdm=True, reprocess=True, verbose=True
),
indexer_config=S3IndexerConfig(remote_url="s3://utic-dev-tech-fixtures/small-pdf-set/"),
downloader_config=S3DownloaderConfig(download_dir=download_path),
source_connection_config=S3ConnectionConfig(anonymous=True),
Expand Down
6 changes: 4 additions & 2 deletions unstructured/ingest/v2/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def redact_jsons(s: str) -> str:
try:
formatted_j = json.dumps(json.loads(j))
except json.JSONDecodeError:
formatted_j = json.dumps(ast.literal_eval(j))
lit = ast.literal_eval(j)
formatted_j = json.dumps(lit)
hidden_j = json.dumps(hide_sensitive_fields(json.loads(formatted_j)))
s = s.replace(j, hidden_j)
return s
Expand Down Expand Up @@ -112,7 +113,8 @@ def make_default_logger(level: int) -> Logger:
handler.name = "ingest_log_handler"
formatter = SensitiveFormatter("%(asctime)s %(processName)-10s %(levelname)-8s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
if handler.name not in [h.name for h in logger.handlers]:
logger.addHandler(handler)
logger.setLevel(level)
remove_root_handlers(logger)
return logger
Expand Down
22 changes: 12 additions & 10 deletions unstructured/ingest/v2/pipeline/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from functools import wraps
from pathlib import Path
from time import time
from typing import Any, Optional, TypeVar
from typing import Any, Callable, Optional, TypeVar

from tqdm import tqdm
from tqdm.asyncio import tqdm as tqdm_asyncio

from unstructured.ingest.v2.interfaces import BaseProcess, ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.logger import logger, make_default_logger

BaseProcessT = TypeVar("BaseProcessT", bound=BaseProcess)
iterable_input = list[dict[str, Any]]
Expand Down Expand Up @@ -98,7 +98,7 @@ def _wrap_mp(self, input_kwargs: dict) -> Any:

def _set_log_level(self, log_level: int):
# Set the log level for each spawned process when using multiprocessing pool
logger.setLevel(log_level)
make_default_logger(log_level)

@timed
def __call__(self, iterable: Optional[iterable_input] = None) -> Any:
Expand All @@ -113,15 +113,16 @@ def __call__(self, iterable: Optional[iterable_input] = None) -> Any:
return self.process_async(iterable=iterable)
return self.process_multiprocess(iterable=iterable)

def _run(self, *args, **kwargs: Any) -> Optional[Any]:
raise NotImplementedError
def _run(self, fn: Callable, **kwargs: Any) -> Optional[Any]:
return asyncio.run(self.run_async(_fn=fn, **kwargs))

async def _run_async(self, *args, **kwargs: Any) -> Optional[Any]:
async def _run_async(self, fn: Callable, **kwargs: Any) -> Optional[Any]:
raise NotImplementedError

def run(self, *args, **kwargs: Any) -> Optional[Any]:
def run(self, _fn: Optional[Callable] = None, **kwargs: Any) -> Optional[Any]:
try:
return self._run(*args, **kwargs)
fn = _fn or self.process.run
return self._run(fn=fn, **kwargs)
except Exception as e:
logger.error(f"Exception raised while running {self.identifier}", exc_info=e)
if "file_data_path" in kwargs:
Expand All @@ -130,9 +131,10 @@ def run(self, *args, **kwargs: Any) -> Optional[Any]:
raise e
return None

async def run_async(self, *args, **kwargs: Any) -> Optional[Any]:
async def run_async(self, _fn: Optional[Callable] = None, **kwargs: Any) -> Optional[Any]:
try:
return await self._run_async(*args, **kwargs)
fn = _fn or self.process.run_async
return await self._run_async(fn=fn, **kwargs)
except Exception as e:
logger.error(f"Exception raised while running {self.identifier}", exc_info=e)
if "file_data_path" in kwargs:
Expand Down
4 changes: 2 additions & 2 deletions unstructured/ingest/v2/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Optional, Union

from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.logger import logger, make_default_logger
from unstructured.ingest.v2.pipeline.steps.chunk import Chunker, ChunkStep
from unstructured.ingest.v2.pipeline.steps.download import DownloaderT, DownloadStep
from unstructured.ingest.v2.pipeline.steps.embed import Embedder, EmbedStep
Expand Down Expand Up @@ -59,7 +59,7 @@ def __post_init__(
stager: UploadStager = None,
uploader: Uploader = None,
):
logger.setLevel(level=logging.DEBUG if self.context.verbose else logging.INFO)
make_default_logger(level=logging.DEBUG if self.context.verbose else logging.INFO)
self.indexer_step = IndexStep(process=indexer, context=self.context)
self.downloader_step = DownloadStep(process=downloader, context=self.context)
self.partitioner_step = PartitionStep(process=partitioner, context=self.context)
Expand Down
30 changes: 11 additions & 19 deletions unstructured/ingest/v2/pipeline/steps/chunk.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import Callable, Optional, TypedDict

from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.logger import logger
Expand Down Expand Up @@ -53,32 +54,23 @@ def _save_output(self, output_filepath: str, chunked_content: list[dict]):
logger.debug(f"Writing chunker output to: {output_filepath}")
json.dump(chunked_content, f, indent=2)

def _run(self, path: str, file_data_path: str) -> ChunkStepResponse:
async def _run_async(
self, fn: Callable, path: str, file_data_path: str, **kwargs
) -> ChunkStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_chunk(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping chunking, output already exists: {output_filepath}")
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))
chunked_content_raw = self.process.run(elements_filepath=path)
self._save_output(
output_filepath=str(output_filepath),
chunked_content=elements_to_dicts(chunked_content_raw),
)
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))

async def _run_async(self, path: str, file_data_path: str) -> ChunkStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_chunk(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping chunking, output already exists: {output_filepath}")
return ChunkStepResponse(file_data_path=file_data_path, path=str(output_filepath))
if semaphore := self.context.semaphore:
fn_kwargs = {"elements_filepath": path}
if not asyncio.iscoroutinefunction(fn):
chunked_content_raw = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
chunked_content_raw = await self.process.run_async(elements_filepath=path)
chunked_content_raw = await fn(**fn_kwargs)
else:
chunked_content_raw = await self.process.run_async(elements_filepath=path)
chunked_content_raw = await fn(**fn_kwargs)
self._save_output(
output_filepath=str(output_filepath),
chunked_content=elements_to_dicts(chunked_content_raw),
Expand Down
52 changes: 21 additions & 31 deletions unstructured/ingest/v2/pipeline/steps/download.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from typing import Optional, TypedDict, TypeVar
from typing import Callable, Optional, TypedDict, TypeVar

from unstructured.ingest.v2.interfaces import FileData, download_responses
from unstructured.ingest.v2.interfaces.downloader import Downloader
Expand Down Expand Up @@ -55,7 +56,7 @@ def should_download(self, file_data: FileData, file_data_path: str) -> bool:
if self.context.re_download:
return True
download_path = self.process.get_download_path(file_data=file_data)
if not download_path.exists():
if not download_path or not download_path.exists():
return True
if (
download_path.is_file()
Expand All @@ -69,6 +70,24 @@ def should_download(self, file_data: FileData, file_data_path: str) -> bool:
return True
return False

async def _run_async(self, fn: Callable, file_data_path: str) -> list[DownloadStepResponse]:
file_data = FileData.from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if not self.should_download(file_data=file_data, file_data_path=file_data_path):
logger.debug(f"Skipping download, file already exists locally: {download_path}")
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]
fn_kwargs = {"file_data": file_data}
if not asyncio.iscoroutinefunction(fn):
download_results = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
download_results = await fn(**fn_kwargs)
else:
download_results = await fn(**fn_kwargs)
return self.create_step_results(
current_file_data_path=file_data_path, download_results=download_results
)

def create_step_results(
self, current_file_data_path: str, download_results: download_responses
) -> list[DownloadStepResponse]:
Expand All @@ -87,35 +106,6 @@ def create_step_results(
)
return download_step_results

def _run(self, file_data_path: str) -> list[DownloadStepResponse]:
file_data = FileData.from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if not self.should_download(file_data=file_data, file_data_path=file_data_path):
logger.debug(f"Skipping download, file already exists locally: {download_path}")
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]

download_results = self.process.run(file_data=file_data)
return self.create_step_results(
current_file_data_path=file_data_path, download_results=download_results
)

async def _run_async(self, file_data_path: str) -> list[DownloadStepResponse]:
file_data = FileData.from_file(path=file_data_path)
download_path = self.process.get_download_path(file_data=file_data)
if download_path and not self.should_download(
file_data=file_data, file_data_path=file_data_path
):
logger.debug(f"Skipping download, file already exists locally: {download_path}")
return [DownloadStepResponse(file_data_path=file_data_path, path=str(download_path))]
if semaphore := self.context.semaphore:
async with semaphore:
download_results = await self.process.run_async(file_data=file_data)
else:
download_results = await self.process.run_async(file_data=file_data)
return self.create_step_results(
current_file_data_path=file_data_path, download_results=download_results
)

def persist_new_file_data(self, file_data: FileData) -> str:
record_hash = self.get_hash(extras=[file_data.identifier])
filename = f"{record_hash}.json"
Expand Down
29 changes: 9 additions & 20 deletions unstructured/ingest/v2/pipeline/steps/embed.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import Callable, Optional, TypedDict

from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.logger import logger
Expand Down Expand Up @@ -53,33 +54,21 @@ def _save_output(self, output_filepath: str, embedded_content: list[dict]):
logger.debug(f"Writing embedded output to: {output_filepath}")
json.dump(embedded_content, f, indent=2)

def _run(self, path: str, file_data_path: str) -> EmbedStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)

output_filepath = self.get_output_filepath(filename=path)
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping embedding, output already exists: {output_filepath}")
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
embed_content_raw = self.process.run(elements_filepath=path)
self._save_output(
output_filepath=str(output_filepath),
embedded_content=elements_to_dicts(embed_content_raw),
)
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))

async def _run_async(self, path: str, file_data_path: str) -> EmbedStepResponse:
async def _run_async(self, fn: Callable, path: str, file_data_path: str) -> EmbedStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping embedding, output already exists: {output_filepath}")
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
if semaphore := self.context.semaphore:
fn_kwargs = {"elements_filepath": path}
if not asyncio.iscoroutinefunction(fn):
embed_content_raw = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
embed_content_raw = await self.process.run_async(elements_filepath=path)
embed_content_raw = await fn(**fn_kwargs)
else:
embed_content_raw = await self.process.run_async(elements_filepath=path)
embed_content_raw = await fn(**fn_kwargs)

self._save_output(
output_filepath=str(output_filepath),
Expand Down
33 changes: 11 additions & 22 deletions unstructured/ingest/v2/pipeline/steps/partition.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, TypedDict
from typing import Callable, Optional, TypedDict

from unstructured.ingest.v2.interfaces import FileData
from unstructured.ingest.v2.logger import logger
Expand Down Expand Up @@ -48,35 +49,23 @@ def _save_output(self, output_filepath: str, partitioned_content: list[dict]):
logger.debug(f"Writing partitioned output to: {output_filepath}")
json.dump(partitioned_content, f, indent=2)

def _run(self, path: str, file_data_path: str) -> PartitionStepResponse:
async def _run_async(
self, fn: Callable, path: str, file_data_path: str
) -> PartitionStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=Path(file_data_path))
if not self.should_partition(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping partitioning, output already exists: {output_filepath}")
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))
partitioned_content = self.process.run(filename=path, metadata=file_data.metadata)
self._save_output(
output_filepath=str(output_filepath), partitioned_content=partitioned_content
)
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))

async def _run_async(self, path: str, file_data_path: str) -> PartitionStepResponse:
path = Path(path)
file_data = FileData.from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=Path(file_data_path))
if not self.should_partition(filepath=output_filepath, file_data=file_data):
logger.debug(f"Skipping partitioning, output already exists: {output_filepath}")
return PartitionStepResponse(file_data_path=file_data_path, path=str(output_filepath))
if semaphore := self.context.semaphore:
fn_kwargs = {"filename": path, "metadata": file_data.metadata}
if not asyncio.iscoroutinefunction(fn):
partitioned_content = fn(**fn_kwargs)
elif semaphore := self.context.semaphore:
async with semaphore:
partitioned_content = await self.process.run_async(
filename=path, metadata=file_data.metadata
)
partitioned_content = await fn(**fn_kwargs)
else:
partitioned_content = await self.process.run_async(
filename=path, metadata=file_data.metadata
)
partitioned_content = await fn(**fn_kwargs)
self._save_output(
output_filepath=str(output_filepath), partitioned_content=partitioned_content
)
Expand Down
Loading

0 comments on commit a6c09ec

Please sign in to comment.