-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
760 additions
and
76 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
from collections.abc import Generator | ||
from contextlib import AbstractContextManager | ||
from typing import Any, Self, TypedDict, TypeVar | ||
|
||
from attrs import define, field | ||
from httpx import URL, Client | ||
|
||
from .data_models import Base | ||
|
||
T = TypeVar("T") | ||
# Note: `TGen` on its own is equivalent to `TGen[Any]`. | ||
TGen = Generator[T, None, None] | ||
TBase = TypeVar("TBase", bound=Base) | ||
|
||
JSONDict = dict[str, Any] | ||
|
||
|
||
class PageMeta(TypedDict): | ||
size: int | ||
offset: int | ||
limit: int | ||
|
||
|
||
class ResponseJSON(TypedDict): | ||
metadata: PageMeta | ||
result: list[JSONDict] | ||
|
||
|
||
@define | ||
class PaginatedResponse: | ||
client: "ExportClient" | ||
table: str | ||
page_meta: PageMeta | ||
items: list[JSONDict] | ||
|
||
@property | ||
def is_last_page(self): | ||
return self.page_meta["size"] < self.page_meta["limit"] | ||
|
||
@classmethod | ||
def from_json( | ||
cls, client: "ExportClient", table: str, resp_json: ResponseJSON | ||
) -> Self: | ||
return cls(client, table, resp_json["metadata"], resp_json["result"]) | ||
|
||
def __iter__(self) -> TGen[JSONDict]: | ||
yield from self.items | ||
|
||
def iter_all(self) -> TGen[JSONDict]: | ||
yield from self.items | ||
if self.is_last_page: | ||
return | ||
limit = self.page_meta["limit"] | ||
offset = self.page_meta["offset"] + limit | ||
nextpage = self.client._get_data_export(self.table, limit=limit, offset=offset) | ||
yield from nextpage.iter_all() | ||
|
||
|
||
@define | ||
class ExportClient(AbstractContextManager): | ||
base_url: URL = field(converter=URL) | ||
auth_token: str | ||
_cached_client: Client | None = None | ||
|
||
@property | ||
def _client(self) -> Client: | ||
if self._cached_client is None: | ||
headers = { | ||
"Accept": "application/json", | ||
"Authorization": f"Bearer {self.auth_token}", | ||
} | ||
self._cached_client = Client(headers=headers) | ||
return self._cached_client | ||
|
||
def close(self): | ||
if self._cached_client: | ||
self._cached_client.close() | ||
self._cached_client = None | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.close() | ||
|
||
def get_faqmatches(self, **kw) -> PaginatedResponse: | ||
return self._get_data_export("faqmatches", **kw) | ||
|
||
def get_model_items(self, model: type[TBase], **kw) -> TGen[TBase]: | ||
paginated_items = self._get_data_export(model.__tablename__, **kw) | ||
for item in paginated_items.iter_all(): | ||
yield model.from_json(item) | ||
|
||
def _get_data_export( | ||
self, table: str, limit: int = 1000, offset: int = 0 | ||
) -> PaginatedResponse: | ||
params = {"limit": limit, "offset": offset} | ||
resp = self._client.get(self.base_url.join(table), params=params) | ||
resp.raise_for_status() | ||
return PaginatedResponse.from_json(self, table, resp.json()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from collections.abc import Iterable, Iterator | ||
from typing import Generic, TypeVar | ||
|
||
T = TypeVar("T") | ||
|
||
|
||
class IteratorWithFinishedCheck(Generic[T]): | ||
""" | ||
An iterator that knows if it's reached its end. | ||
""" | ||
|
||
_iter: Iterator[T] | ||
_next_item: T | ||
_finished: bool = False | ||
|
||
@property | ||
def finished(self) -> bool: | ||
return self._finished | ||
|
||
def __init__(self, iterable: Iterable[T]): | ||
self._iter = iter(iterable) | ||
self._set_next() | ||
|
||
def __iter__(self): | ||
return self | ||
|
||
def __next__(self) -> T: | ||
if self._finished: | ||
raise StopIteration | ||
next_item = self._next_item | ||
self._set_next() | ||
return next_item | ||
|
||
def _set_next(self): | ||
try: | ||
self._next_item = next(self._iter) | ||
except StopIteration: | ||
self._finished = True | ||
|
||
def peek_next(self) -> T: | ||
if self.finished: | ||
raise StopIteration | ||
return self._next_item |
Oops, something went wrong.