diff --git a/elbow/builders.py b/elbow/builders.py index 8b079c1..2db2f54 100644 --- a/elbow/builders.py +++ b/elbow/builders.py @@ -15,7 +15,9 @@ from elbow.record import RecordBatch from elbow.sinks import BufferedParquetWriter from elbow.typing import StrOrPath -from elbow.utils import atomicopen, cpu_count, setup_logging +from elbow.utils import atomicopen, cpu_count + +logger = logging.getLogger(__name__) def build_table( @@ -51,7 +53,6 @@ def build_table( extract=extract, workers=workers, max_failures=max_failures, - log_level=logging.getLogger().level, ) results = _run_pool(_worker, workers, worker_id) @@ -66,10 +67,7 @@ def _build_table_worker( extract: Extractor, workers: int, max_failures: Optional[int], - log_level: int, ): - setup_logging(log_level) - if isinstance(source, str): source = iglob(source, recursive=True) @@ -140,7 +138,6 @@ def build_parquet( max_failures=max_failures, path_column=path_column, mtime_column=mtime_column, - log_level=logging.getLogger().level, ) _run_pool(_worker, workers, worker_id) @@ -157,10 +154,7 @@ def _build_parquet_worker( max_failures: Optional[int], path_column: str, mtime_column: str, - log_level: int, ): - setup_logging(log_level) - start = datetime.now() output = Path(output) if isinstance(source, str): @@ -209,7 +203,7 @@ def _check_workers(workers: Optional[int], worker_id: Optional[int]) -> Tuple[in if not (worker_id is None or 0 <= worker_id < workers): raise ValueError( - f"Invalid worker_id {worker_id}; expeced 0 <= worker_id < {workers}" + f"Invalid worker_id {worker_id}; expected 0 <= worker_id < {workers}" ) return workers, worker_id @@ -230,7 +224,7 @@ def _run_pool( results.append(result) except Exception as exc: worker_id = futures_to_id[future] - logging.warning( + logger.warning( "Generated exception in worker %d", worker_id, exc_info=exc ) diff --git a/elbow/pipeline.py b/elbow/pipeline.py index c9e3a36..c8ab7ec 100644 --- a/elbow/pipeline.py +++ b/elbow/pipeline.py @@ -11,6 +11,8 @@ __all__ = ["ProcessCounts", "Pipeline"] +logger = logging.getLogger(__name__) + @dataclass class ProcessCounts: @@ -68,7 +70,7 @@ def run(self) -> ProcessCounts: counts.success += 1 except Exception as exc: - logging.warning("Failed to process %s", path, exc_info=exc) + logger.warning("Failed to process %s", path, exc_info=exc) counts.error += 1 if ( self.max_failures is not None diff --git a/elbow/sinks/parquet.py b/elbow/sinks/parquet.py index 8007683..4432963 100644 --- a/elbow/sinks/parquet.py +++ b/elbow/sinks/parquet.py @@ -11,6 +11,8 @@ __all__ = ["BufferedParquetWriter"] +logger = logging.getLogger(__name__) + class BufferedParquetWriter: """ @@ -105,7 +107,7 @@ def _flush(self, blocking: bool = True): self._push_batch() if self._future is not None and self._future.running(): - logging.info("Waiting for previous batch to finish writing") + logger.info("Waiting for previous batch to finish writing") self._future.result() if self._table is not None: diff --git a/elbow/utils.py b/elbow/utils.py index 51268b3..8e3bb03 100644 --- a/elbow/utils.py +++ b/elbow/utils.py @@ -1,75 +1,13 @@ -import logging import os import re -import sys import tempfile from contextlib import contextmanager from pathlib import Path -from typing import Dict, Optional, Tuple, Union +from typing import Tuple, Union StrOrPath = Union[str, Path] -class RepetitiveFilter(logging.Filter): - """ - Suppress similar log messages after a number of repeats. - """ - - def __init__(self, max_repeats: int = 5): - self.max_repeats = max_repeats - - self._counts: Dict[Tuple[str, int], int] = {} - - def filter(self, record: logging.LogRecord): - key = record.pathname, record.lineno - count = self._counts.get(key, 0) - if count == self.max_repeats: - record.msg += " [future messages suppressed]" - self._counts[key] = count + 1 - return count <= self.max_repeats - - -def setup_logging( - level: Optional[Union[int, str]] = None, - max_repeats: Optional[int] = 5, - log_path: Optional[StrOrPath] = None, -): - """ - Setup root logger. - """ - fmt = "[%(levelname)s %(asctime)s]: " "%(message)s" - formatter = logging.Formatter(fmt, datefmt="%y-%m-%d %H:%M:%S") - - logger = logging.getLogger() - if level is not None: - logger.setLevel(level) - - # clean up any pre-existing filters and handlers - for f in logger.filters: - logger.removeFilter(f) - for h in logger.handlers: - logger.removeHandler(h) - - if max_repeats: - logger.addFilter(RepetitiveFilter(max_repeats=max_repeats)) - - stream_handler = logging.StreamHandler(sys.stdout) - stream_handler.setFormatter(formatter) - stream_handler.setLevel(logger.level) - logger.addHandler(stream_handler) - - if log_path is not None: - file_handler = logging.FileHandler(log_path, mode="a") - file_handler.setFormatter(formatter) - file_handler.setLevel(logger.level) - logger.addHandler(file_handler) - - # Redefining the root logger is not strictly best practice. - # https://stackoverflow.com/a/7430495 - # But I want the convenience to just call e.g. `logging.info()`. - logging.root = logger # type: ignore - - @contextmanager def atomicopen(path: StrOrPath, mode: str = "w", **kwargs): """ diff --git a/example/example.py b/example/example.py index 50baec1..649edf0 100644 --- a/example/example.py +++ b/example/example.py @@ -1,11 +1,11 @@ import argparse +import logging from pathlib import Path import numpy as np from PIL import Image from elbow.builders import build_parquet -from elbow.utils import setup_logging def main(): @@ -45,7 +45,7 @@ def main(): parser.add_argument("-v", "--verbose", help="Verbose logging.", action="store_true") args = parser.parse_args() - setup_logging("INFO" if args.verbose else "ERROR") + logging.basicConfig(level="INFO" if args.verbose else "ERROR") build_parquet( source=str(args.root / "**" / "*.jpg"), diff --git a/tests/__init__.py b/tests/__init__.py index 2baae06..e69de29 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +0,0 @@ -from elbow.utils import setup_logging - -setup_logging() diff --git a/tests/test_utils.py b/tests/test_utils.py index 49ca9c0..3576cc7 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -9,12 +9,6 @@ DATA_DIR = Path(__file__).parent / "data" -def test_repetitive_filter(): - # default logger already contains the repetitive filter - for ii in range(10): - logging.warning("Message: %d", ii) - - def test_atomicopen(tmp_path: Path): fname = tmp_path / "file.txt" with ut.atomicopen(fname, "x") as f: