Skip to content

Commit

Permalink
Add scripts for downloading data from polish court API (#2)
Browse files Browse the repository at this point in the history
* Add scripts for downloading data from polish court API

* Refine logging

* Refactor to make mongo bulk writes

* Fix typing errors

* Add missing dependency

* Refine retiries and log warning on invalid pl_court_api params

---------

Co-authored-by: Jakub Binkowski <[email protected]>
  • Loading branch information
binkjakub and Jakub Binkowski authored Mar 8, 2024
1 parent 5834bfd commit 1e484bb
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 5 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,6 @@ checklink/cookies.txt

# Quarto
.quarto
.history
.history

secrets.env
2 changes: 1 addition & 1 deletion juddges/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
'doc_host': 'https://laugustyniak.github.io',
'git_url': 'https://github.com/laugustyniak/juddges',
'lib_path': 'juddges'},
'syms': {}}
'syms': {'juddges.data.pl_court_api': {}}}
Empty file added juddges/data/__init__.py
Empty file.
43 changes: 43 additions & 0 deletions juddges/data/pl_court_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any

import requests
import xmltodict
from loguru import logger


class PolishCourtAPI:
def __init__(self) -> None:
self.url = "https://apiorzeczenia.wroclaw.sa.gov.pl/ncourt-api"

def get_number_of_judgements(self, params: dict[str, Any] | None = None) -> int:
if params is None:
params = {}
elif "limit" in params.keys():
logger.warning("Setting limit to query the number of judgements has no effect!")

params |= {"limit": 0}
endpoint = f"{self.url}/judgements"
res = requests.get(endpoint, params=params)
res.raise_for_status()
total_judgements = xmltodict.parse(res.content.decode("utf-8"))["judgements"]["@total"]

return int(total_judgements)

def get_judgements(self, params: dict[str, Any]) -> list[dict[str, Any]]:
endpoint = f"{self.url}/judgements"
res = requests.get(endpoint, params=params)
res.raise_for_status()
judgements = xmltodict.parse(res.content.decode("utf-8"))["judgements"]["judgement"]

assert isinstance(judgements, list)

return judgements

def get_content(self, id: str) -> str:
params = {"id": id}
endpoint = f"{self.url}/judgement/content"
res = requests.get(endpoint, params=params)
res.raise_for_status()
content = res.content.decode("utf-8")

return content
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extend-include = ["*.ipynb"]
exclude = ["_modidx.py"]

[tool.mypy]
python_version = "3.10"
python_version = "3.11"
strict = true
untyped_calls_exclude = ["pymongo"]
plugins = "numpy.typing.mypy_plugin"
11 changes: 11 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,14 @@ ruff==0.2.2
coverage==7.4.3
pytest==8.0.2
mypy==1.8.0


requests==2.31.0
pymongo==4.3.3
xmltodict==0.13.0
python-dotenv==1.0.1
rich==13.7.0
mpire==2.10.0
tenacity==8.2.3
loguru==0.7.2
typer==0.9.0
102 changes: 102 additions & 0 deletions scripts/download_pl_content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import math
from typing import Generator, Any

import typer
from dotenv import load_dotenv
from loguru import logger
from mpire.pool import WorkerPool
from pymongo import MongoClient, UpdateOne
from pymongo.cursor import Cursor
from pymongo.errors import BulkWriteError
from pymongo.server_api import ServerApi
from requests import HTTPError
from tenacity import retry, wait_random_exponential, retry_if_exception_type, stop_after_attempt

from juddges.data.pl_court_api import PolishCourtAPI

N_JOBS = 8
BATCH_SIZE = 100

load_dotenv("secrets.env", verbose=True)


def main(
mongo_uri: str = typer.Option(..., envvar="MONGO_URI"),
batch_size: int = typer.Option(BATCH_SIZE),
n_jobs: int = typer.Option(N_JOBS),
) -> None:
client: MongoClient[dict[str, Any]] = MongoClient(mongo_uri, server_api=ServerApi("1"))
collection = client["juddges"]["judgements"]
client.admin.command("ping")

query = {"content": {"$exists": False}}
num_docs_without_content = collection.count_documents(query)
logger.info(f"There are {num_docs_without_content} documents without content")

cursor = collection.find(query, batch_size=batch_size)

docs_to_update = yield_batches(cursor, batch_size)
download_content = ContentDownloader(mongo_uri)
with WorkerPool(n_jobs=n_jobs) as pool:
pool.map_unordered(
download_content,
docs_to_update,
progress_bar=True,
iterable_len=math.ceil(num_docs_without_content / batch_size),
)


class ContentDownloader:
def __init__(self, mongo_uri: str):
self.mongo_uri = mongo_uri

def __call__(self, *doc_ids: str) -> None:
data_batch: list[UpdateOne] = []

for d_id in doc_ids:
content = self._download_content(d_id)
data_batch.append(UpdateOne({"_id": d_id}, {"$set": {"content": content}}))

client: MongoClient[dict[str, Any]] = MongoClient(self.mongo_uri)
collection = client["juddges"]["judgements"]

try:
collection.bulk_write(data_batch)
except BulkWriteError as err:
logger.error(err)

@retry(
wait=wait_random_exponential(multiplier=1, min=4, max=30),
retry=retry_if_exception_type(HTTPError),
stop=stop_after_attempt(5),
)
def _download_content(self, doc_id: str) -> str | None:
api = PolishCourtAPI()
try:
return api.get_content(doc_id)
except HTTPError as err:
if err.response.status_code == 404:
logger.warning("Found no content for judgement {id}", id=doc_id)
return None
else:
raise


def yield_batches(
cursor: Cursor[dict[str, Any]], batch_size: int
) -> Generator[list[str], None, None]:
"""Generates batches of data from pymongo.Cursor.
Credit: https://stackoverflow.com/a/61809417
"""

batch: list[str] = []
for i, row in enumerate(cursor):
if i % batch_size == 0 and i > 0:
yield batch
del batch[:]
batch.append(str(row["_id"]))
yield batch


if __name__ == "__main__":
typer.run(main)
79 changes: 79 additions & 0 deletions scripts/download_pl_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import Optional, Any

import typer
from dotenv import load_dotenv
from mpire.pool import WorkerPool
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
from pymongo.server_api import ServerApi
from requests import HTTPError
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential
from loguru import logger

from juddges.data.pl_court_api import PolishCourtAPI

N_JOBS = 8
BATCH_SIZE = 1_000

load_dotenv("secrets.env", verbose=True)


def main(
mongo_uri: str = typer.Option(..., envvar="MONGO_URI"),
batch_size: int = typer.Option(BATCH_SIZE),
n_jobs: int = typer.Option(N_JOBS),
limit: Optional[int] = typer.Option(None),
) -> None:
api = PolishCourtAPI()
total_judgements = api.get_number_of_judgements()
logger.info(f"Total judgements found: {total_judgements}")

if limit is not None:
total_judgements = min(total_judgements, limit)

offsets = list(range(0, total_judgements, batch_size))
download_metadata = MetadataDownloader(mongo_uri, batch_size)
with WorkerPool(n_jobs=n_jobs) as pool:
pool.map_unordered(download_metadata, offsets, progress_bar=True)


class MetadataDownloader:
def __init__(self, mongo_uri: str, batch_size: int):
self.mongo_uri = mongo_uri
self.batch_size = batch_size

@retry(
wait=wait_random_exponential(multiplier=1, min=4, max=30),
retry=retry_if_exception_type(HTTPError),
stop=stop_after_attempt(5),
)
def __call__(self, offset: int) -> None:
client: MongoClient[dict[str, Any]] = MongoClient(self.mongo_uri, server_api=ServerApi("1"))
collection = client["juddges"]["judgements"]

params = {
"sort": "date-asc",
"limit": self.batch_size,
"offset": offset,
}
api = PolishCourtAPI()
judgements = api.get_judgements(params)

for item in judgements:
item["_id"] = item.pop("id")

# ignore when come across duplicated documents
try:
collection.insert_many(judgements, ordered=False)
except BulkWriteError as err:
if any(x["code"] != 11000 for x in err.details["writeErrors"]):
raise
else:
logger.warning(
"found {num_duplicated} judgements duplicated",
num_duplicated=len(err.details["writeErrors"]),
)


if __name__ == "__main__":
typer.run(main)
2 changes: 0 additions & 2 deletions scripts/run_dummy.py

This file was deleted.

0 comments on commit 1e484bb

Please sign in to comment.