From 5ea838d427c378f5a9b86dd16a9838e619cd3129 Mon Sep 17 00:00:00 2001 From: Albert Sawczyn Date: Mon, 2 Sep 2024 21:42:52 +0200 Subject: [PATCH] feat: final version of scraping --- docker/nsa/docker-compose.yml | 19 --- juddges/data/nsa/scraper.py | 223 +++++++++++----------------- requirements.txt | 2 + scripts/nsa/scrap_documents_list.py | 121 +++++++++++++++ 4 files changed, 211 insertions(+), 154 deletions(-) create mode 100644 scripts/nsa/scrap_documents_list.py diff --git a/docker/nsa/docker-compose.yml b/docker/nsa/docker-compose.yml index 26fb703..b2a8be0 100755 --- a/docker/nsa/docker-compose.yml +++ b/docker/nsa/docker-compose.yml @@ -6,24 +6,5 @@ services: volumes: - nsa_dbdata:/data/db - multiple-tor: - build: - context: ./multiple-tor - args: - SOCKET_START_PORT: 9000 - SOCKET_END_PORT: 9100 - CONTROL_START_PORT: 9900 - CONTROL_END_PORT: 10000 - dockerfile: ./Dockerfile - ports: - - 9000-9100:9000-9100 - - 9900-10000:9900-10000 - command: > - python3 main.py --num-tors 50 - - volumes: nsa_dbdata: - multitor_data: - - diff --git a/juddges/data/nsa/scraper.py b/juddges/data/nsa/scraper.py index 4e4cfff..04cf683 100644 --- a/juddges/data/nsa/scraper.py +++ b/juddges/data/nsa/scraper.py @@ -1,142 +1,90 @@ +import random import re +import time import mechanicalsoup -import typer from bs4 import BeautifulSoup -from mpire import WorkerPool -from tqdm import trange -from datetime import datetime, timedelta -import pymongo from loguru import logger +from requests import HTTPError, RequestException +from requests.adapters import HTTPAdapter +from retry import retry +from urllib3 import Retry -from juddges.utils.tor import TorClient - -DB_URI = "mongodb://localhost:27017/" - -START_DATE = "1981-01-01" -END_DATE = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d") - - -def main( - socket_port_start: int = typer.Option(9000), - config_port_start: int = typer.Option(9900), - n_jobs: int = typer.Option(1), -) -> None: - client = pymongo.MongoClient(DB_URI) - db = client["nsa"] - dates_col = db["dates"] - - done = [] - for record in dates_col.aggregate( - [{"$group": {"_id": {"start_date": "$start_date", "end_date": "$end_date"}}}] - ): - done.append((record["_id"]["start_date"], record["_id"]["end_date"])) - - logger.info(f"Found {len(done)} done dates in the database.") - - dates = generate_dates(START_DATE, END_DATE) - - start_end_dates = list(reversed(list(zip(dates, dates[1:])))) - start_end_dates = filter_done_dates(start_end_dates, done) - - worker_torclient_assign = [ - TorClient("", socket_port_start + i, config_port_start + i) for i in range(n_jobs) - ] - with WorkerPool( - n_jobs=n_jobs, - pass_worker_id=True, - shared_objects=worker_torclient_assign, - start_method="threading", - ) as pool: - for result in pool.map( - process_date_range, start_end_dates, progress_bar=True, chunk_size=1 - ): - dates_col.insert_many(result) - - -def generate_dates(start_date: str, end_date: str) -> list[str]: - date_format = "%Y-%m-%d" - start = datetime.strptime(start_date, date_format) - end = datetime.strptime(end_date, date_format) - - date_list = [] - current_date = start - while current_date <= end: - date_list.append(current_date.strftime(date_format)) - current_date += timedelta(days=1) - - return date_list - - -def filter_done_dates(dates: list[tuple[str, str]], done: list[tuple[str, str]]): - done_dates = set(map(tuple, done)) - return [date for date in dates if date not in done_dates] - - -def process_date_range( - worker_id: int, worker_torclient_assign: list[TorClient], start_date, end_date -): - tor_client = worker_torclient_assign[worker_id] - nsa_scraper = NSAScraper(tor_client) - documents = nsa_scraper.search_documents(start_date, end_date) - if documents: - success = [] - for page_id, document_ids in documents.items(): - page_success = "FOUND" if document_ids is not None else "ERROR: Redirected" - success.append( - { - "start_date": start_date, - "end_date": end_date, - "page_id": page_id, - "success": page_success, - "document_ids": document_ids, - } - ) - else: - success = [ - { - "start_date": start_date, - "end_date": end_date, - "page_id": None, - "success": "NO_DOCUMENTS", - } - ] - return success +class IncorrectNumberOfDocumentsFound(Exception): + pass -class NSAScraper: - def __init__(self, tor_client: TorClient | None = None) -> None: - self.browser = mechanicalsoup.StatefulBrowser(user_agent="MechanicalSoup") - if tor_client: - self.browser.session.proxies = tor_client.proxy_config - def search_documents(self, start_date, end_date): - response = self.browser.open("https://orzeczenia.nsa.gov.pl/cbo") - if response.status_code != 200: - raise Exception(f"Failed to open the website. Status code: {response.status_code}") +class IncorrectPage(Exception): + pass + +class NSAScraper: + def __init__(self, user_agent: str, proxy_config: dict[str, str] | None = None) -> None: + self.browser = mechanicalsoup.StatefulBrowser( + user_agent=user_agent, + requests_adapters={ + "https://": HTTPAdapter( + max_retries=Retry( + total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504, 403] + ) + ), + "http://": HTTPAdapter( + max_retries=Retry( + total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504, 403] + ) + ), + }, + ) + + if proxy_config: + self.browser.session.proxies = proxy_config + + @retry( + tries=15, + exceptions=(RequestException, HTTPError, IncorrectNumberOfDocumentsFound, IncorrectPage), + ) + def search_documents_for_date(self, date): + self._browser_open("https://orzeczenia.nsa.gov.pl/cbo") self.browser.select_form() - # browser["symbole"] = "648" - self.browser["odDaty"] = start_date - self.browser["doDaty"] = end_date - self.browser.submit_selected() - if self.any_documents_found(self.browser): - documents = self.retrieve_documents(self.browser) + self.browser["odDaty"] = date + self.browser["doDaty"] = date + self._browser_submit_selected() + if self._any_documents_found(self.browser): + documents = self._retrieve_documents() num_documents = sum(map(lambda x: len(x) if x else 0, documents.values())) - print(f"Found {num_documents} documents on {len(documents)} pages.") + logger.info(f"Found {num_documents} documents on {len(documents)} pages.") return documents else: - print("No documents found") + logger.info("No documents found") return None - def any_documents_found(self, browser: mechanicalsoup.StatefulBrowser) -> bool: + def _browser_open(self, url: str) -> None: + response = self.browser.open(url, verify=False, timeout=30) + self._post_call(response) + + def _browser_submit_selected(self) -> None: + response = self.browser.submit_selected(verify=False, timeout=30) + self._post_call(response) + + def _post_call(self, response) -> None: + response.raise_for_status() + # wait random from normal distribution + time_to_wait = random.normalvariate(1, 0.5) + time.sleep(time_to_wait if time_to_wait > 0 else 0) + if not self._correct_page(): + raise IncorrectPage(f"Incorrect page: {self.browser.page.text}") + + def _correct_page(self) -> bool: + title = "Centralna Baza Orzeczeń Sądów Administracyjnych" + return title in self.browser.page.text + + def _any_documents_found(self, browser: mechanicalsoup.StatefulBrowser) -> bool: warning_text = "Nie znaleziono orzeczeń spełniających podany warunek!" return warning_text not in browser.page.text - def retrieve_documents( - self, browser: mechanicalsoup.StatefulBrowser - ) -> dict[int, list[str] | None]: - page_links = browser.links(url_regex="^/cbo/find\?p=") + def _retrieve_documents(self) -> dict[int, list[str] | None]: + page_links = self.browser.links(url_regex="^/cbo/find\?p=") if not page_links: last_page = 1 else: @@ -144,20 +92,28 @@ def retrieve_documents( last_page = int(last_page_link.text) documents: dict[int, list[str] | None] = {} - for page_id in trange(1, last_page + 1, disable=last_page == 1): - browser.open(f"https://orzeczenia.nsa.gov.pl/cbo/find?p={page_id}") - - if browser.url.endswith(f"{page_id}"): - page_documents = self.find_documents_on_page(browser.page) - assert ( - 0 < len(page_documents) <= 10 - ), f"Page {page_id} has {len(page_documents)} documents" - documents[page_id] = page_documents - else: - documents[page_id] = None + for page_id in range(1, last_page + 1): + documents[page_id] = self._retrieve_documents_from_page(page_id) + return documents + + @retry( + tries=15, + exceptions=(RequestException, HTTPError, IncorrectNumberOfDocumentsFound, IncorrectPage), + ) + def _retrieve_documents_from_page(self, page_id: int) -> list[str] | None: + self._browser_open(f"https://orzeczenia.nsa.gov.pl/cbo/find?p={page_id}") + if self.browser.url.endswith(f"{page_id}"): + page_documents = self._find_documents_on_page(self.browser.page) + if not (0 < len(page_documents) <= 10): + raise IncorrectNumberOfDocumentsFound( + f"Page {page_id} has {len(page_documents)} documents. Page URL: {self.browser.url}." + ) + documents = page_documents + else: + documents = None return documents - def find_documents_on_page(self, page: BeautifulSoup) -> list[str]: + def _find_documents_on_page(self, page: BeautifulSoup) -> list[str]: all_links = page.find_all("a", href=True) pattern = re.compile(r"^/doc/[A-Za-z0-9]+$") @@ -171,6 +127,3 @@ def find_documents_on_page(self, page: BeautifulSoup) -> list[str]: filtered_links.append(href) return filtered_links - - -typer.run(main) diff --git a/requirements.txt b/requirements.txt index 1ecd6df..0791717 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,6 +35,8 @@ xmltodict==0.13.0 xlsxwriter==3.2.0 mechanicalsoup==1.3.0 tor-python-easy==0.1.7 +retry==0.9.2 +random-user-agent==1.0.1 # dev coverage==7.4.3 diff --git a/scripts/nsa/scrap_documents_list.py b/scripts/nsa/scrap_documents_list.py new file mode 100644 index 0000000..b224d8c --- /dev/null +++ b/scripts/nsa/scrap_documents_list.py @@ -0,0 +1,121 @@ +import random +from datetime import datetime, timedelta + +import pymongo +import typer +import urllib3 +from loguru import logger +from mpire import WorkerPool +from random_user_agent.user_agent import UserAgent + +from juddges.data.nsa.scraper import NSAScraper + +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +DB_URI = "mongodb://localhost:27017/" + +START_DATE = "1981-01-01" +END_DATE = datetime.now().strftime("%Y-%m-%d") + + +def main( + n_jobs: int = typer.Option(30), + proxy_address: str = typer.Option(...), + db_uri: str = typer.Option(DB_URI), + start_date: str = typer.Option(START_DATE), + end_date: str = typer.Option(END_DATE), +) -> None: + client = pymongo.MongoClient(db_uri) + db = client["nsa"] + dates_col = db["dates"] + errors_col = db["errors"] + + done = dates_col.find().distinct("date") + logger.info(f"Found {len(done)} done dates in the database.") + + dates = generate_dates(start_date, end_date) + + random.shuffle(dates) + dates = filter_done_dates(dates, done) + + success = 0 + error = 0 + with WorkerPool(n_jobs=n_jobs, shared_objects=proxy_address) as pool: + for result in pool.imap_unordered( + process_date, + dates, + progress_bar=True, + progress_bar_options={"smoothing": 0}, + chunk_size=1, + ): + assert len(result) == 1 + if "error" in result: + result["error"]["time_added"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + errors_col.insert_one(result["error"]) + error += 1 + elif "success" in result: + for r in result["success"]: + r["time_added"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + dates_col.insert_many(result["success"]) + success += 1 + else: + raise ValueError(f"Invalid result: {result}") + logger.info(f"Success: {success}, Error: {error}") + + +def generate_dates(start_date: str, end_date: str) -> list[str]: + date_format = "%Y-%m-%d" + start = datetime.strptime(start_date, date_format) + end = datetime.strptime(end_date, date_format) + + date_list = [] + current_date = start + while current_date <= end: + date_list.append(current_date.strftime(date_format)) + current_date += timedelta(days=1) + + return date_list + + +def filter_done_dates(dates: list[str], done: list[str]) -> list[str]: + done_dates = set(done) + return [date for date in dates if date not in done_dates] + + +def process_date(proxy_address: str, date: str) -> dict[str, list[dict[str, str]] | dict[str, str]]: + proxy = {"http": proxy_address, "https": proxy_address} + nsa_scraper = NSAScraper( + user_agent=UserAgent(limit=1000).get_random_user_agent().encode("utf-8").decode("utf-8"), + proxy_config=proxy, + ) + try: + documents = nsa_scraper.search_documents_for_date(date) + except Exception as e: + error_message = f"Failed to scrape documents for date {date}: {e}" + logger.error(f"Failed to scrape documents for date {date}: {e}; Error type: {type(e)}") + return {"error": {"date": date, "error": error_message, "error_type": type(e).__name__}} + if documents: + success = [] + for page_id, document_ids in documents.items(): + page_success = "FOUND" if document_ids is not None else "ERROR: Redirected" + success.append( + { + "date": date, + "page_id": page_id, + "success": page_success, + "document_ids": document_ids, + } + ) + else: + success = [ + { + "date": date, + "page_id": None, + "success": "NO_DOCUMENTS", + } + ] + return {"success": success} + + +if __name__ == "__main__": + typer.run(main)