diff --git a/src/event_server/storage.py b/src/event_server/storage.py index 3ab1f2b..6e1af66 100644 --- a/src/event_server/storage.py +++ b/src/event_server/storage.py @@ -1,8 +1,11 @@ +from abc import abstractmethod import base64 from datetime import datetime +import itertools from logging import warn from pathlib import Path -from typing import Union +from typing import TextIO, Generator, Self, Union +import zipfile from pydantic import UUID1, ValidationError @@ -10,6 +13,89 @@ from .model.event import Event +def zip_lt_monkeypatch(self: zipfile.Path, other: zipfile.Path): + return self.filename < other.filename + + +zipfile.Path.__lt__ = zip_lt_monkeypatch + + +class DayPartition: + month: int + day: int + path: Path + + def __init__(self, path: Path) -> None: + self.path = path + self.month, self.day = map(int, path.stem.split("-")) + + def open(self) -> TextIO: + return self.path.open() + + +class YearPartition: + path: Path + year: int + + def __init__(self, path) -> None: + self.path = path + self.year = int(path.name) + + def subpartitions(self) -> Generator[DayPartition, None, None]: + for day_partition in sorted(self.path.glob("*.jsonl")): + yield DayPartition(day_partition) + + @classmethod + def all(cls, path: Path) -> Generator[Self, None, None]: + for subpath in sorted(path.iterdir()): + if subpath.is_dir(): + yield OpenYearPartition(subpath) + if subpath.suffix == ".zip": + try: + partition = ArchivedYearPartition(subpath) + except TypeError: + pass + else: + yield partition + + +class OpenYearPartition(YearPartition): + @classmethod + def all(cls, path: Path) -> Generator[Self, None, None]: + for subpath in sorted(path.iterdir()): + if subpath.is_dir(): + yield cls(subpath) + + +class ArchivedYearPartition(YearPartition): + def __init__(self, path) -> None: + self._path = path + self.year = int(path.stem) + self._zip = None + + @property + def path(self): + if self._zip is None: + self._zip = zipfile.Path(zipfile.ZipFile(self._path)) + content = list(self._zip.iterdir()) + if len(content) == 1 and content[0].name == str(self.year): + self._zip = zipfile.Path( + zipfile.ZipFile(self._path), at=f"{self.year}/" + ) + + return self._zip + + @classmethod + def all(cls, path: Path) -> Generator[Self, None, None]: + for zip in sorted(path.glob("*.zip")): + try: + partition = cls(zip) + except TypeError: + pass + else: + yield partition + + class Storage: application: str account: UUID1 @@ -34,8 +120,8 @@ def is_empty(self) -> bool: def find_event(self, id) -> Event: if self.is_empty(): raise KeyError(id) - for year_partition in sorted(self.path.iterdir()): - for day_partition in sorted(year_partition.glob("*.jsonl")): + for year_partition in YearPartition.all(self.path): + for day_partition in year_partition.subpartitions(): with day_partition.open() as jf: for line in jf.readlines(): try: @@ -51,19 +137,21 @@ def list(self, max: int = 100, since: Union[datetime, None] = None): return [] events = [] - for year_partition in sorted(self.path.iterdir()): - year = int(year_partition.name) - if since is not None and since.year > year: + for year_partition in YearPartition.all(self.path): + if since is not None and since.year > year_partition.year: continue - for day_partition in sorted(year_partition.glob("*.jsonl")): + for day_partition in year_partition.subpartitions(): is_since_day = False - if since is not None and since.year == year: - month, day = map(int, day_partition.name.split(".")[0].split("-")) - if since.month > month or ( - since.month == month and since.day > day + if since is not None and since.year == year_partition.year: + if since.month > day_partition.month or ( + since.month == day_partition.month + and since.day > day_partition.day ): continue - if since.month == month and since.day == day: + if ( + since.month == day_partition.month + and since.day == day_partition.day + ): is_since_day = True with day_partition.open() as jf: for line in jf.readlines():