From cb503a47a8306fc96c0d045368a0542e672c9b88 Mon Sep 17 00:00:00 2001 From: Struan Donald Date: Wed, 21 Aug 2024 13:56:38 +0100 Subject: [PATCH] [SP] convert wrans intermediate XML to PW format --- pyscraper/sp_2024/__main__.py | 9 ++ pyscraper/sp_2024/convert_wrans.py | 157 +++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 pyscraper/sp_2024/convert_wrans.py diff --git a/pyscraper/sp_2024/__main__.py b/pyscraper/sp_2024/__main__.py index f990f0b0..6d01632d 100644 --- a/pyscraper/sp_2024/__main__.py +++ b/pyscraper/sp_2024/__main__.py @@ -12,6 +12,7 @@ import click from .convert import convert_xml_to_twfy +from .convert_wrans import convert_wrans_xml_to_twfy from .download import fetch_debates_for_dates, fetch_wrans_for_dates from .parse import tidy_up_html from .parse_wrans import tidy_up_wrans_html @@ -22,6 +23,7 @@ download_dir = parldata / "cmpages" / "sp_2024" / "raw" parsed_dir = parldata / "cmpages" / "sp_2024" / "parsed" output_dir = parldata / "scrapedxml" / "sp-new" +output_dir_wrans = parldata / "scrapedxml" / "sp-written" @click.group() @@ -175,6 +177,13 @@ def wrans( print(f"Parsing up {file}") tidy_up_wrans_html(file, parsed_dir) + if convert: + file_iterator = cache_dir_iterator(parsed_dir, start, end, partial_file_name) + for file in file_iterator: + if verbose: + print(f"Converting {file} to TheyWorkForYou format") + convert_wrans_xml_to_twfy(file, output_dir_wrans, verbose=verbose) + if __name__ == "__main__": cli(prog_name="python -m pyscraper.sp_2024") diff --git a/pyscraper/sp_2024/convert_wrans.py b/pyscraper/sp_2024/convert_wrans.py new file mode 100644 index 00000000..6104a703 --- /dev/null +++ b/pyscraper/sp_2024/convert_wrans.py @@ -0,0 +1,157 @@ +""" +Convert the structured data from Scottish Parliament to +the XML format used by TheyWorkForYou + +Link to TWFY IDs for members. +""" + +import datetime +import re +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + +from lxml import etree + +from .resolvenames import get_unique_person_id, is_member_vote + + +@dataclass +class IDFactory: + committee_slug: str + iso_date: str + ref: str = "" + base_id: str = "uk.org.publicwhip/spwa/" + q_num: int = -1 + + def _current_id(self) -> str: + return f"{self.base_id}{self.iso_date}.{self.latest_major}.{self.latest_minor}" + + def set_ref(self, ref): + self.ref = ref + + def get_next_major_id(self) -> str: + return f"{self.base_id}{self.iso_date}.mh" + + def get_next_minor_id(self) -> str: + self.q_num = 0 + return f"{self.base_id}{self.iso_date}.{self.ref}.h" + + def get_next_q_id(self) -> str: + return f"{self.base_id}{self.iso_date}.{self.ref}.q{self.q_num}" + + def get_next_r_id(self) -> str: + id = f"{self.base_id}{self.iso_date}.{self.ref}.r{self.q_num}" + self.q_num += 1 + return id + + +def convert_wrans_xml_to_twfy(file_path: Path, output_dir: Path, verbose: bool = False): + """ + Convert from the loose structured xml format to the + TWFY xml format + """ + if verbose: + print(f"Converting {file_path}") + + # get source as an xml tree + with file_path.open("r") as f: + source = etree.fromstring(f.read()) + + # root of the tree is a publicwhip object + root = etree.Element("publicwhip") + + iso_date = source.get("date") + + # get the date in format Thursday 9 June 2005 + date_str = datetime.date.fromisoformat(iso_date).strftime("%A %d %B %Y") + + committee_slug = "sp-written" + + dest_path = output_dir / committee_slug / f"spwa{iso_date}.xml" + dest_path.parent.mkdir(parents=True, exist_ok=True) + + id_factory = IDFactory(committee_slug=committee_slug, iso_date=iso_date) + + # there is only questions for today + major_heading = etree.Element("major-heading") + major_heading.set("id", id_factory.get_next_major_id()) + major_heading.set("nospeaker", "True") + # major_heading.set("url", item.get("url")) + major_heading.text = f"Written Questions for {date_str}" + root.append(major_heading) + + # iterate through the questions + for item in source.iter("spwrans"): + id_factory.set_ref(item.get("id")) + + # each question is a minor heading using the id as the title because + # we don't have anything else to use + minor_heading = etree.Element("minor-heading") + minor_heading.set("id", id_factory.get_next_minor_id()) + minor_heading.text = f"Question {item.get('id')}" + root.append(minor_heading) + + missing_speakers = [] + for subitem in item.find("parsed"): + if subitem.tag == "question": + speaker_name = subitem.get("speaker_name") + person_id = get_unique_person_id(speaker_name, iso_date) + if ( + person_id is None + and speaker_name not in missing_speakers + and verbose + ): + print(f"Could not find person id for {speaker_name}") + missing_speakers.append(speaker_name) + speech = etree.Element("ques") + speech.set("id", id_factory.get_next_q_id()) + speech.set("url", item.get("url") or "") + speech.set("speakername", speaker_name) + speech.set("person_id", person_id or "unknown") + for child in subitem: + speech.append(child) + root.append(speech) + + elif subitem.tag == "answer": + speaker_name = subitem.get("speaker_name") + person_id = get_unique_person_id(speaker_name, iso_date) + if ( + person_id is None + and speaker_name not in missing_speakers + and verbose + ): + print(f"Could not find person id for {speaker_name}") + missing_speakers.append(speaker_name) + speech = etree.Element("reply") + speech.set("id", id_factory.get_next_r_id()) + speech.set("url", item.get("url") or "") + speech.set("speakername", speaker_name) + speech.set("person_id", person_id or "unknown") + for child in subitem: + speech.append(child) + root.append(speech) + + # write the new xml to a file + etree.indent(root, space=" ") + + with dest_path.open("wb") as f: + f.write(etree.tostring(root, pretty_print=True)) + + +def convert_to_twfy( + cache_dir: Path, + output_dir: Path, + partial_file_name: Optional[str] = None, + verbose: bool = False, +): + """ + Given a cache directory, parse the raw_html elements in the xml files + This updates the 'parsed' element under each agenda-item. + """ + if partial_file_name: + xmls = list(cache_dir.glob(f"{partial_file_name}*")) + else: + xmls = list(cache_dir.glob("*.xml")) + for xml in xmls: + convert_wrans_xml_to_twfy(xml, output_dir, verbose=verbose)