Skip to content

Commit

Permalink
Simplify logging (#23)
Browse files Browse the repository at this point in the history
Fixes issues related to overwriting root logger. Simplifies logging
approach following best practice (custom logger in each module with
inherited settings).
  • Loading branch information
clane9 authored Feb 23, 2024
1 parent ed182cc commit b87d9fd
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 87 deletions.
16 changes: 5 additions & 11 deletions elbow/builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
from elbow.record import RecordBatch
from elbow.sinks import BufferedParquetWriter
from elbow.typing import StrOrPath
from elbow.utils import atomicopen, cpu_count, setup_logging
from elbow.utils import atomicopen, cpu_count

logger = logging.getLogger(__name__)


def build_table(
Expand Down Expand Up @@ -51,7 +53,6 @@ def build_table(
extract=extract,
workers=workers,
max_failures=max_failures,
log_level=logging.getLogger().level,
)

results = _run_pool(_worker, workers, worker_id)
Expand All @@ -66,10 +67,7 @@ def _build_table_worker(
extract: Extractor,
workers: int,
max_failures: Optional[int],
log_level: int,
):
setup_logging(log_level)

if isinstance(source, str):
source = iglob(source, recursive=True)

Expand Down Expand Up @@ -140,7 +138,6 @@ def build_parquet(
max_failures=max_failures,
path_column=path_column,
mtime_column=mtime_column,
log_level=logging.getLogger().level,
)

_run_pool(_worker, workers, worker_id)
Expand All @@ -157,10 +154,7 @@ def _build_parquet_worker(
max_failures: Optional[int],
path_column: str,
mtime_column: str,
log_level: int,
):
setup_logging(log_level)

start = datetime.now()
output = Path(output)
if isinstance(source, str):
Expand Down Expand Up @@ -209,7 +203,7 @@ def _check_workers(workers: Optional[int], worker_id: Optional[int]) -> Tuple[in

if not (worker_id is None or 0 <= worker_id < workers):
raise ValueError(
f"Invalid worker_id {worker_id}; expeced 0 <= worker_id < {workers}"
f"Invalid worker_id {worker_id}; expected 0 <= worker_id < {workers}"
)
return workers, worker_id

Expand All @@ -230,7 +224,7 @@ def _run_pool(
results.append(result)
except Exception as exc:
worker_id = futures_to_id[future]
logging.warning(
logger.warning(
"Generated exception in worker %d", worker_id, exc_info=exc
)

Expand Down
4 changes: 3 additions & 1 deletion elbow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

__all__ = ["ProcessCounts", "Pipeline"]

logger = logging.getLogger(__name__)


@dataclass
class ProcessCounts:
Expand Down Expand Up @@ -68,7 +70,7 @@ def run(self) -> ProcessCounts:
counts.success += 1

except Exception as exc:
logging.warning("Failed to process %s", path, exc_info=exc)
logger.warning("Failed to process %s", path, exc_info=exc)
counts.error += 1
if (
self.max_failures is not None
Expand Down
4 changes: 3 additions & 1 deletion elbow/sinks/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

__all__ = ["BufferedParquetWriter"]

logger = logging.getLogger(__name__)


class BufferedParquetWriter:
"""
Expand Down Expand Up @@ -105,7 +107,7 @@ def _flush(self, blocking: bool = True):
self._push_batch()

if self._future is not None and self._future.running():
logging.info("Waiting for previous batch to finish writing")
logger.info("Waiting for previous batch to finish writing")
self._future.result()

if self._table is not None:
Expand Down
64 changes: 1 addition & 63 deletions elbow/utils.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,13 @@
import logging
import os
import re
import sys
import tempfile
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, Optional, Tuple, Union
from typing import Tuple, Union

StrOrPath = Union[str, Path]


class RepetitiveFilter(logging.Filter):
"""
Suppress similar log messages after a number of repeats.
"""

def __init__(self, max_repeats: int = 5):
self.max_repeats = max_repeats

self._counts: Dict[Tuple[str, int], int] = {}

def filter(self, record: logging.LogRecord):
key = record.pathname, record.lineno
count = self._counts.get(key, 0)
if count == self.max_repeats:
record.msg += " [future messages suppressed]"
self._counts[key] = count + 1
return count <= self.max_repeats


def setup_logging(
level: Optional[Union[int, str]] = None,
max_repeats: Optional[int] = 5,
log_path: Optional[StrOrPath] = None,
):
"""
Setup root logger.
"""
fmt = "[%(levelname)s %(asctime)s]: " "%(message)s"
formatter = logging.Formatter(fmt, datefmt="%y-%m-%d %H:%M:%S")

logger = logging.getLogger()
if level is not None:
logger.setLevel(level)

# clean up any pre-existing filters and handlers
for f in logger.filters:
logger.removeFilter(f)
for h in logger.handlers:
logger.removeHandler(h)

if max_repeats:
logger.addFilter(RepetitiveFilter(max_repeats=max_repeats))

stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logger.level)
logger.addHandler(stream_handler)

if log_path is not None:
file_handler = logging.FileHandler(log_path, mode="a")
file_handler.setFormatter(formatter)
file_handler.setLevel(logger.level)
logger.addHandler(file_handler)

# Redefining the root logger is not strictly best practice.
# https://stackoverflow.com/a/7430495
# But I want the convenience to just call e.g. `logging.info()`.
logging.root = logger # type: ignore


@contextmanager
def atomicopen(path: StrOrPath, mode: str = "w", **kwargs):
"""
Expand Down
4 changes: 2 additions & 2 deletions example/example.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import argparse
import logging
from pathlib import Path

import numpy as np
from PIL import Image

from elbow.builders import build_parquet
from elbow.utils import setup_logging


def main():
Expand Down Expand Up @@ -45,7 +45,7 @@ def main():
parser.add_argument("-v", "--verbose", help="Verbose logging.", action="store_true")

args = parser.parse_args()
setup_logging("INFO" if args.verbose else "ERROR")
logging.basicConfig(level="INFO" if args.verbose else "ERROR")

build_parquet(
source=str(args.root / "**" / "*.jpg"),
Expand Down
3 changes: 0 additions & 3 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from elbow.utils import setup_logging

setup_logging()
6 changes: 0 additions & 6 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@
DATA_DIR = Path(__file__).parent / "data"


def test_repetitive_filter():
# default logger already contains the repetitive filter
for ii in range(10):
logging.warning("Message: %d", ii)


def test_atomicopen(tmp_path: Path):
fname = tmp_path / "file.txt"
with ut.atomicopen(fname, "x") as f:
Expand Down

0 comments on commit b87d9fd

Please sign in to comment.