diff --git a/deepwell/scripts/importer/.gitignore b/deepwell/importer/.gitignore similarity index 100% rename from deepwell/scripts/importer/.gitignore rename to deepwell/importer/.gitignore diff --git a/deepwell/importer/__init__.py b/deepwell/importer/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/deepwell/importer/__main__.py b/deepwell/importer/__main__.py new file mode 100644 index 0000000000..76d6ff94b6 --- /dev/null +++ b/deepwell/importer/__main__.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import os +import sys + +from .importer import Importer +from .wikicomma_config import parse_config + +LOG_FORMAT = "[%(levelname)s] %(asctime)s %(name)s: %(message)s" +LOG_DATE_FORMAT = "%Y/%m/%d %H:%M:%S" +LOG_FILENAME = "import.log" +LOG_FILE_MODE = "a" + +if __name__ == "__main__": + argparser = argparse.ArgumentParser(description="WikiComma importer") + argparser.add_argument( + "-q", + "--quiet", + "--no-stdout", + dest="stdout", + action="store_false", + help="Don't output to standard out", + ) + argparser.add_argument( + "--log", + dest="log_file", + default=LOG_FILENAME, + help="The log file to write to", + ) + argparser.add_argument( + "-c", + "--config", + dest="wikicomma_config", + required=True, + help="The configuration JSON that Wikicomma uses", + ) + argparser.add_argument( + "-d", + "--directory", + "--wikicomma-directory", + dest="wikicomma_directory", + required=True, + help="The directory where Wikicomma data resides", + ) + argparser.add_argument( + "-o", + "--sqlite", + "--output-sqlite", + dest="sqlite_path", + required=True, + help="The location to output the SQLite database to", + ) + argparser.add_argument( + "-D", + "--delete-sqlite", + action="store_true", + help="Delete the output SQLite before starting operations", + ) + argparser.add_argument( + "-b", + "--bucket", + "--s3-bucket", + dest="s3_bucket", + required=True, + help="The S3 bucket to store uploaded files in", + ) + argparser.add_argument( + "-P", + "--profile", + "--aws-profile", + dest="aws_profile", + required=True, + help="The AWS profile containing the secrets", + ) + args = argparser.parse_args() + + log_fmtr = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT) + log_file = logging.FileHandler( + filename=LOG_FILENAME, + encoding="utf-8", + mode=LOG_FILE_MODE, + ) + log_file.setFormatter(log_fmtr) + + logger = logging.getLogger(__package__) + logger.setLevel(level=logging.DEBUG) + logger.addHandler(log_file) + + if args.stdout: + log_stdout = logging.StreamHandler(sys.stdout) + log_stdout.setFormatter(log_fmtr) + logger.addHandler(log_stdout) + + wikicomma_config = parse_config(args.wikicomma_config) + + importer = Importer( + wikicomma_config=wikicomma_config, + wikicomma_directory=args.wikicomma_directory, + sqlite_path=args.sqlite_path, + delete_sqlite=args.delete_sqlite, + s3_bucket=args.s3_bucket, + aws_profile=args.aws_profile, + ) + importer.run() diff --git a/deepwell/importer/database.py b/deepwell/importer/database.py new file mode 100644 index 0000000000..6e859bf59e --- /dev/null +++ b/deepwell/importer/database.py @@ -0,0 +1,634 @@ +import json +import logging +import os +import sqlite3 +from typing import Optional + +from .wikicomma_config import SiteData +from .utils import kangaroo_twelve, from_js_timestamp + +logger = logging.getLogger(__name__) + +ANONYMOUS_USER_ID = 2 + + +class Database: + __slots__ = ("conn",) + + def __init__(self, db_url: str, delete: bool = False) -> None: + if delete: + if os.path.exists(db_url): + logger.debug("Deleting previous SQLite at %s", db_url) + os.remove(db_url) + + self.conn = sqlite3.connect(db_url) + + def seed(self) -> None: + seed_path = os.path.join(os.path.dirname(__file__), "seed.sql") + + with open(seed_path) as file: + self.conn.executescript(file.read()) + + def close(self) -> None: + self.conn.close() + + def add_user_block(self, block: dict, filename: str) -> None: + logger.info("Found %d users in block '%s'", len(block), filename) + + with self.conn as cur: + # key is redundant, string of user ID + for data in block.values(): + self.add_user(cur, data) + + def add_text(self, cur, contents: str) -> str: + logger.debug("Adding text entry (len %d)", len(contents)) + + hex_hash = kangaroo_twelve(contents) + cur.execute( + """ + INSERT INTO text + (hex_hash, contents) + VALUES + (?, ?) + ON CONFLICT + DO NOTHING + """, + (hex_hash, contents), + ) + return hex_hash + + def add_site(self, *, slug: str, descr: str, url: str, id: int) -> None: + logger.info( + "Inserting site '%s' (%s, %d)", + descr, + slug, + id, + ) + + with self.conn as cur: + cur.execute( + """ + INSERT INTO site + ( + site_slug, + site_id, + site_descr, + site_url + ) + VALUES + (?, ?, ?, ?) + ON CONFLICT + DO UPDATE + SET + site_descr = ?, + site_url = ? + """, + ( + slug, + id, + descr, + url, + descr, + url, + ), + ) + + def add_user(self, cur, data: dict) -> None: + logger.info( + "Inserting user '%s' (%s, %d)", + data["full_name"], + data["username"], + data["user_id"], + ) + + cur.execute( + """ + INSERT INTO user + ( + user_slug, + user_name, + user_id, + user_since, + account_type, + karma, + fetched_at, + real_name, + gender, + birthday, + location, + website + ) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT + DO NOTHING + """, + ( + data["username"], # slug (e.g. foo-bar) + data["full_name"], # name (e.g. Foo Bar) + data["user_id"], + data["wikidot_user_since"], + data["account_type"], + data["activity"], + data["fetched_at"] // 1000, + data.get("real_name"), + data.get("gender"), + from_js_timestamp(data.get("birthday")), + data.get("location"), + data.get("website"), + ), + ) + + def add_page(self, cur, *, site_slug: str, page_descr: str, metadata: dict) -> None: + logger.info( + "Inserting into site '%s' page descr '%s'", + site_slug, + page_descr, + ) + + page_id = metadata["page_id"] + sitemap_updated_at = metadata["sitemap_update"] // 1000 + + # If a page has been moved, it can leave multiple entries. + # We want the most recent page if we find such entries. + result = cur.execute( + """ + SELECT page_descr, sitemap_updated_at + FROM page + WHERE page_id = ? + AND site_slug = ? + """, + (page_id, site_slug), + ).fetchone() + if result is not None: + (prior_page_descr, last_sitemap_updated_at) = result + if last_sitemap_updated_at < sitemap_updated_at: + logger.warning( + "Found updated version of page ID %d, deleting previous '%s' (%d < %d)", + page_id, + prior_page_descr, + last_sitemap_updated_at, + sitemap_updated_at, + ) + cur.execute( + """ + DELETE FROM page + WHERE page_id = ? + AND site_slug = ? + """, + (page_id, site_slug), + ) + self.add_deleted_page( + cur, + page_descr=prior_page_descr, + site_slug=site_slug, + page_id=page_id, + ) + else: + logger.warning( + "Found another version of page ID %d, looks newer, skipping (%d ≥ %d)", + page_id, + last_sitemap_updated_at, + sitemap_updated_at, + ) + self.add_deleted_page( + cur, + page_descr=page_descr, + site_slug=site_slug, + page_id=page_id, + ) + return + + # Insert new page + cur.execute( + """ + INSERT INTO page + ( + page_id, + page_descr, + page_slug, + site_slug, + sitemap_updated_at, + title, + locked, + tags + ) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + page_id, + page_descr, + metadata["name"], + site_slug, + sitemap_updated_at, + metadata.get("title", ""), + metadata.get("is_locked", False), + json.dumps(metadata.get("tags", [])), + ), + ) + + def add_deleted_page( + self, + cur, + *, + page_descr: str, + site_slug: str, + page_id: int, + ) -> None: + logger.debug( + "Adding deleted page: %s / %s (%d)", + page_descr, + site_slug, + page_id, + ) + cur.execute( + """ + INSERT INTO page_deleted + ( + page_descr, + site_slug, + page_id + ) + VALUES + (?, ?, ?) + """, + (page_descr, site_slug, page_id), + ) + + def is_deleted_page(self, *, page_descr: str, site_slug: str) -> bool: + with self.conn as cur: + result = cur.execute( + """ + SELECT * + FROM page_deleted + WHERE page_descr = ? + AND site_slug = ? + """, + (page_descr, site_slug), + ).fetchone() + + exists = result is not None + logger.debug( + "Checking if page descr %s exists in site %s: %s", + page_descr, + site_slug, + exists, + ) + return exists + + def add_page_revision_metadata(self, cur, page_id: int, data: dict) -> None: + logger.info( + "Inserting page revision %d for page ID %d", + data["revision"], + page_id, + ) + + cur.execute( + """ + INSERT INTO page_revision + ( + revision_id, + revision_number, + page_id, + user_id, + created_at, + flags, + comments + ) + VALUES + (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT + DO NOTHING + """, + ( + data["global_revision"], + data["revision"], + page_id, + data["author"] or ANONYMOUS_USER_ID, + data["stamp"], + data["flags"], + data["commentary"], + ), + ) + + def add_page_revision_wikitext(self, cur, revision_id: int, contents: str) -> None: + logger.debug("Inserting page revision wikitext for %d", revision_id) + + hex_hash = self.add_text(cur, contents) + cur.execute( + """ + INSERT INTO page_revision_wikitext + (revision_id, wikitext_hash) + VALUES (?, ?) + ON CONFLICT + DO UPDATE + SET wikitext_hash = ? + """, + ( + revision_id, + hex_hash, + hex_hash, + ), + ) + + def add_page_vote( + self, + cur, + *, + page_id: int, + user_id: int, + value: int, + ) -> None: + logger.info( + "Inserting page vote for page ID %d / user ID %d (value %+d)", + page_id, + user_id, + value, + ) + + cur.execute( + """ + INSERT INTO page_vote + ( + page_id, + user_id, + value + ) + VALUES + (?, ?, ?) + ON CONFLICT + DO UPDATE + SET value = ? + """, + ( + page_id, + user_id, + value, + value, + ), + ) + + def add_blob(self, cur, *, hex_hash: str, length: int, mime: str) -> None: + logger.debug("Inserting blob record") + cur.execute( + """ + INSERT INTO blob + (hex_hash, mime, length) + VALUES (?, ?, ?) + """, + (hex_hash, mime, length), + ) + + def blob_exists(self, hex_hash: str) -> bool: + with self.conn as cur: + result = cur.execute( + """ + SELECT * + FROM blob + WHERE hex_hash = ? + """, + (hex_hash,), + ).fetchone() + + exists = result is not None + logger.debug("Checking blob existence: %s (%s)", hex_hash, exists) + return exists + + def add_file( + self, + cur, + *, + file_id: int, + page_id: int, + site_slug: str, + filename: str, + s3_hash: str, + ) -> None: + logger.info("Inserting file for page ID %d", page_id) + + cur.execute( + """ + INSERT INTO file + ( + file_id, + page_id, + site_slug, + filename, + s3_hash + ) + VALUES + (?, ?, ?, ?, ?) + ON CONFLICT + DO UPDATE + SET filename = ?, + s3_hash = ? + """, + ( + file_id, + page_id, + site_slug, + filename, + s3_hash, + filename, + s3_hash, + ), + ) + + def add_forum_category( + self, + cur, + site_slug: str, + metadata: dict, + ) -> None: + forum_category_id = metadata["id"] + logger.info("Inserting forum category ID %d", forum_category_id) + + cur.execute( + """ + INSERT INTO forum_category + ( + forum_category_id, + site_slug, + title, + description, + last_user_id, + thread_count, + post_count, + full_scan, + last_page, + version + ) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT + DO NOTHING + """, + ( + forum_category_id, + site_slug, + metadata["title"], + metadata["description"], + metadata.get("lastUser"), + metadata.get("threads"), + metadata.get("posts"), + metadata["full_scan"], + metadata["last_page"], + metadata["version"], + ), + ) + + def add_forum_thread(self, cur, forum_category_id: int, metadata: dict) -> None: + forum_thread_id = metadata["id"] + logger.info("Inserting forum thread ID %d", forum_thread_id) + + cur.execute( + """ + INSERT INTO forum_thread + ( + forum_thread_id, + forum_category_id, + title, + description, + created_at, + created_by, + post_count, + sticky, + locked, + version + ) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT + DO NOTHING + """, + ( + forum_thread_id, + forum_category_id, + metadata["title"], + metadata["description"], + metadata["started"], + metadata["startedUser"], + metadata["postsNum"], + metadata["sticky"], + metadata.get("isLocked", False), + metadata.get("version"), + ), + ) + + def add_forum_post( + self, + cur, + *, + forum_thread_id: int, + parent_post_id: Optional[int], + metadata: dict, + ) -> None: + forum_post_id = metadata["id"] + logger.info("Inserting forum post ID %d", forum_post_id) + + cur.execute( + """ + INSERT INTO forum_post + ( + forum_post_id, + forum_thread_id, + parent_post_id, + title, + created_at, + created_by, + edited_at, + edited_by + ) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT + DO NOTHING + """, + ( + forum_post_id, + forum_thread_id, + parent_post_id, + metadata["title"], + metadata["stamp"], + metadata["poster"], + metadata.get("lastEdit"), + metadata.get("lastEditBy"), + ), + ) + + def add_forum_post_revision(self, cur, post_id: int, metadata: dict) -> None: + revision_id = metadata["id"] + logger.info("Inserting forum post ID %d (revision ID %d)", post_id, revision_id) + + cur.execute( + """ + INSERT INTO forum_post_revision + ( + forum_post_revision_id, + forum_post_id, + title, + created_at, + created_by + ) + VALUES + (?, ?, ?, ?, ?) + ON CONFLICT + DO NOTHING + """, + ( + revision_id, + post_id, + metadata["title"], + metadata["stamp"], + metadata["author"], + ), + ) + + def add_forum_post_wikitext(self, cur, forum_post_id: int, contents: str): + logger.info("Inserting latest forum post wikitext for ID %d", forum_post_id) + hex_hash = self.add_text(cur, contents) + + cur.execute( + """ + INSERT INTO forum_post_wikitext + ( + forum_post_id, + wikitext_hash + ) + VALUES + (?, ?) + ON CONFLICT + DO UPDATE + SET wikitext_hash = ? + """, + (forum_post_id, hex_hash, hex_hash), + ) + + def add_forum_post_revision_wikitext( + self, + cur, + forum_post_revision_id: int, + contents: str, + ): + logger.info( + "Inserting forum post revision wikitext for ID %d", + forum_post_revision_id, + ) + hex_hash = self.add_text(cur, contents) + + cur.execute( + """ + INSERT INTO forum_post_revision_wikitext + ( + forum_post_revision_id, + wikitext_hash + ) + VALUES + (?, ?) + ON CONFLICT + DO UPDATE + SET wikitext_hash = ? + """, + (forum_post_revision_id, hex_hash, hex_hash), + ) diff --git a/deepwell/importer/importer.py b/deepwell/importer/importer.py new file mode 100644 index 0000000000..771af5af10 --- /dev/null +++ b/deepwell/importer/importer.py @@ -0,0 +1,90 @@ +import hashlib +import json +import logging +import os + +from .database import Database +from .s3 import S3 +from .site import SiteImporter + +logger = logging.getLogger(__name__) + + +class Importer: + __slots__ = ( + "logger", + "wikicomma_config", + "wikicomma_directory", + "database", + "s3", + ) + + def __init__( + self, + *, + wikicomma_config, + wikicomma_directory, + sqlite_path, + delete_sqlite, + aws_profile, + s3_bucket, + ) -> None: + self.wikicomma_config = wikicomma_config + self.wikicomma_directory = wikicomma_directory + self.database = Database(sqlite_path, delete=delete_sqlite) + self.s3 = S3(aws_profile=aws_profile, bucket=s3_bucket, database=self.database) + + def run(self) -> None: + logger.info("Starting Wikicomma importer...") + + self.database.seed() + self.process_users() + self.process_sites() + + def close(self) -> None: + self.database.close() + + def process_users(self) -> None: + logger.info("Processing users...") + + directory = os.path.join(self.wikicomma_directory, "_users") + for filename in os.listdir(directory): + if filename == "pending.json": + logger.debug("Skipping pending user list") + continue + + path = os.path.join(directory, filename) + logger.debug("Reading %s", path) + with open(path) as file: + data = json.load(file) + + self.database.add_user_block(data, filename) + + def process_sites(self) -> None: + logger.info("Processing sites...") + + for site_descr in os.listdir(self.wikicomma_directory): + if site_descr == "_users": + logger.debug("Skipping '_users', not a site") + continue + elif site_descr.endswith(".torrent"): + logger.debug("Skipping torrent file from Wikicomma sync") + continue + + # NOTE: site_descr != site_slug + self.process_site(site_descr) + + def process_site(self, site_descr: str) -> None: + logger.info("Processing site '%s'...", site_descr) + directory = os.path.join(self.wikicomma_directory, site_descr) + + site_data = self.wikicomma_config.sites[site_descr] + site_importer = SiteImporter( + directory=directory, + database=self.database, + s3=self.s3, + site_descr=site_data.descr, + site_slug=site_data.slug, + site_url=site_data.url, + ) + site_importer.run() diff --git a/deepwell/importer/requirements.txt b/deepwell/importer/requirements.txt new file mode 100644 index 0000000000..e526254bd6 --- /dev/null +++ b/deepwell/importer/requirements.txt @@ -0,0 +1,4 @@ +boto3>=1.34.0 +py7zr>=0.21.0 +pycryptodome>=3.20.0 +python-magic>=0.4.0 diff --git a/deepwell/importer/s3.py b/deepwell/importer/s3.py new file mode 100644 index 0000000000..0506d86586 --- /dev/null +++ b/deepwell/importer/s3.py @@ -0,0 +1,52 @@ +import hashlib +import logging + +import boto3 + +logger = logging.getLogger(__name__) + + +class S3: + __slots__ = ( + "aws_profile", + "session", + "client", + "bucket", + "database", + ) + + def __init__(self, *, aws_profile, bucket, database) -> None: + self.aws_profile = aws_profile + self.session = boto3.Session(profile_name=aws_profile) + self.client = self.session.client("s3") + self.bucket = bucket + self.database = database + + def upload(self, file_path: str, mime: str) -> str: + with open(file_path, "rb") as file: + data = file.read() + # files use SHA256, text uses K12 + s3_path = hashlib.sha256(data).hexdigest() + + if not data: + logger.debug("Skipping upload of empty S3 object") + elif self.database.blob_exists(s3_path): + logger.debug("S3 object %s already exists", s3_path) + else: + logger.info("Uploading S3 object %s (len %s)", s3_path, f"{len(data):,}") + self.client.put_object( + Bucket=self.bucket, + Key=s3_path, + Body=data, + ContentLength=len(data), + ) + + with self.database.conn as cur: + self.database.add_blob( + cur, + hex_hash=s3_path, + length=len(data), + mime=mime, + ) + + return s3_path diff --git a/deepwell/importer/seed.sql b/deepwell/importer/seed.sql new file mode 100644 index 0000000000..bd6a82c2e8 --- /dev/null +++ b/deepwell/importer/seed.sql @@ -0,0 +1,143 @@ +CREATE TABLE blob ( + hex_hash TEXT PRIMARY KEY, + mime TEXT NOT NULL, + length INTEGER NOT NULL +); + +CREATE TABLE text ( + hex_hash TEXT PRIMARY KEY, + contents TEXT NOT NULL +); + +CREATE TABLE user ( + user_slug TEXT PRIMARY KEY, + user_name TEXT NOT NULL, + user_id INTEGER NOT NULL UNIQUE, + user_since INTEGER NOT NULL, + account_type TEXT NOT NULL, + karma INTEGER NOT NULL, + fetched_at INTEGER NOT NULL, + real_name TEXT, + gender TEXT, + birthday INTEGER, + location TEXT, + website TEXT +); + +CREATE TABLE site ( + site_slug TEXT PRIMARY KEY, + site_descr TEXT NOT NULL, -- Wikicomma name + site_url TEXT NOT NULL, + site_id INTEGER NOT NULL +); + +CREATE TABLE page ( + page_id INTEGER PRIMARY KEY, + page_descr TEXT NOT NULL, + page_slug TEXT NOT NULL, + site_slug TEXT NOT NULL REFERENCES site(site_slug), + sitemap_updated_at INTEGER NOT NULL, + title TEXT NOT NULL, + locked INTEGER NOT NULL CHECK (locked IN (0, 1)), -- boolean + tags TEXT NOT NULL, -- JSON + + UNIQUE (site_slug, page_descr), + UNIQUE (site_slug, page_slug) +); + +CREATE TABLE page_deleted ( + page_descr TEXT, + site_slug TEXT, + page_id INTEGER NOT NULL, + + PRIMARY KEY (site_slug, page_descr) +); + +CREATE TABLE page_revision ( + revision_id INTEGER PRIMARY KEY, + revision_number INTEGER NOT NULL CHECK (revision_number >= 0), + page_id INTEGER NOT NULL REFERENCES page(page_id), + user_id INTEGER NOT NULL REFERENCES user(user_id), + created_at INTEGER NOT NULL, + flags TEXT NOT NULL, + comments TEXT NOT NULL, + + UNIQUE (page_id, revision_number) +); + +CREATE TABLE page_revision_wikitext ( + revision_id INTEGER PRIMARY KEY REFERENCES page_revision(revision_id), + wikitext_hash TEXT NOT NULL REFERENCES text(hex_hash) +); + +CREATE TABLE page_vote ( + page_id INTEGER REFERENCES page(page_id), + user_id INTEGER REFERENCES user(user_id), + value INTEGER NOT NULL, + + PRIMARY KEY (page_id, user_id) +); + +CREATE TABLE file ( + file_id INTEGER PRIMARY KEY, + page_id INTEGER NOT NULL REFERENCES page(page_id), + site_slug TEXT NOT NULL REFERENCES site(site_slug), + filename TEXT NOT NULL, + s3_hash TEXT NOT NULL REFERENCES blob(hex_hash) +); + +CREATE TABLE forum_category ( + forum_category_id INTEGER PRIMARY KEY, + site_slug TEXT NOT NULL REFERENCES site(site_slug), + title TEXT NOT NULL, + description TEXT NOT NULL, + last_user_id INTEGER REFERENCES user(user_id), + thread_count INTEGER, + post_count INTEGER, + full_scan INTEGER NOT NULL CHECK (full_scan IN (0, 1)), -- boolean + last_page INTEGER NOT NULL, + version INTEGER NOT NULL +); + +CREATE TABLE forum_thread ( + forum_thread_id INTEGER PRIMARY KEY, + forum_category_id INTEGER NOT NULL REFERENCES forum_category(forum_category_id), + title TEXT NOT NULL, + description TEXT NOT NULL, + created_at INTEGER NOT NULL, + created_by INTEGER REFERENCES users(user_id), -- NULL means wikidot + post_count INTEGER NOT NULL, + sticky INTEGER NOT NULL CHECK (sticky IN (0, 1)), -- boolean + locked INTEGER NOT NULL CHECK (locked IN (0, 1)), -- boolean + version INTEGER +); + +CREATE TABLE forum_post ( + forum_post_id INTEGER PRIMARY KEY, + forum_thread_id INTEGER NOT NULL REFERENCES forum_thread(forum_thread_id), + parent_post_id INTEGER REFERENCES forum_post(forum_post_id), + title TEXT NOT NULL, + created_at INTEGER NOT NULL, + created_by INTEGER NOT NULL REFERENCES user(user_id), + edited_at INTEGER, + edited_by INTEGER REFERENCES user(user_id) +); + +-- For the latest post revision's wikitext +CREATE TABLE forum_post_wikitext ( + forum_post_id INTEGER PRIMARY KEY REFERENCES forum_post(forum_post_id), + wikitext_hash TEXT NOT NULL REFERENCES text(hex_hash) +); + +CREATE TABLE forum_post_revision ( + forum_post_revision_id INTEGER PRIMARY KEY, + forum_post_id INTEGER NOT NULL REFERENCES forum_post(forum_post_id), + title TEXT NOT NULL, + created_at INTEGER NOT NULL, + created_by INTEGER NOT NULL REFERENCES user(user_id) +); + +CREATE TABLE forum_post_revision_wikitext ( + forum_post_revision_id INTEGER PRIMARY KEY REFERENCES forum_post_revision(forum_post_revision_id), + wikitext_hash TEXT NOT NULL REFERENCES text(hex_hash) +); diff --git a/deepwell/importer/site.py b/deepwell/importer/site.py new file mode 100644 index 0000000000..8527f0c35d --- /dev/null +++ b/deepwell/importer/site.py @@ -0,0 +1,525 @@ +import json +import logging +import os +import re +from functools import cache +from io import BytesIO +from typing import Optional, Tuple, Union +from urllib.parse import unquote as percent_unquote +from urllib.request import urlopen + +import magic +import py7zr + +from .database import Database +from .s3 import S3 + +SITE_ID_REGEX = re.compile(r"WIKIREQUEST\.info\.siteId = (\d+);") + +logger = logging.getLogger(__name__) + + +class SiteImporter: + __slots__ = ( + "directory", + "database", + "s3", + "site_descr", + "site_slug", + "site_url", + "site_id", + "file_metadata", + ) + + def __init__( + self, + *, + directory: str, + database: Database, + s3: S3, + site_descr: str, + site_slug: str, + site_url: str, + ) -> None: + self.directory = directory + self.database = database + self.s3 = s3 + self.site_descr = site_descr + self.site_slug = site_slug + self.site_url = site_url + self.site_id = self.get_site_id(site_url) + self.file_metadata = {} + + @cache + def get_site_id(self, site_url: str) -> int: + with self.database.conn as cur: + result = cur.execute( + """ + SELECT site_id FROM site + WHERE site_url = ? + """, + (site_url,), + ).fetchone() + + if result is not None: + site_id = result[0] + logger.debug("Found site ID for URL %s: %d", site_url, site_id) + return site_id + + logger.info("Downloading web page %s to scrape site ID", site_url) + + with urlopen(site_url) as file: + html = file.read().decode("utf-8") + + match = SITE_ID_REGEX.search(html) + if match is None: + logger.error("Unable to find site ID in HTML") + raise ValueError(site_url) + + return int(match[1]) + + def get_page_id(self, *, page_slug: str = None, page_descr: str = None) -> int: + with self.database.conn as cur: + match bool(page_slug), bool(page_descr): + case True, False: + query = """ + SELECT page_id + FROM page + WHERE page_slug = ? + AND site_slug = ? + """ + parameters = (page_slug, self.site_slug) + case False, True: + query = """ + SELECT page_id + FROM page + WHERE page_descr = ? + AND site_slug = ? + """ + parameters = (page_descr, self.site_slug) + case _, _: + raise ValueError( + "Must pass exactly one parameter into get_page_id()", + ) + + result = cur.execute(query, parameters).fetchone() + + if result is None: + raise RuntimeError( + f"Cannot find page ID for page_descr={page_descr} / page_slug={page_slug} in site '{self.site_slug}'", + ) + + (page_id,) = result + return page_id + + def get_page_descr(self, page_id: int) -> str: + with self.database.conn as cur: + result = cur.execute( + """ + SELECT page_metadata.page_descr + FROM page + JOIN page_metadata + ON page.page_id = page_metadata.page_id + WHERE page_metadata.page_id = ? + AND page.site_slug = ? + """, + (page_id, self.site_slug), + ).fetchone() + + if result is None: + raise RuntimeError( + f"Cannot find page descr for page ID {page_id} in site '{self.site_slug}'", + ) + + (page_descr,) = result + return page_descr + + def get_revision_id(self, cur, page_id: int, revision_number: int) -> int: + result = cur.execute( + """ + SELECT revision_id + FROM page_revision + WHERE page_id = ? + AND revision_number = ? + """, + (page_id, revision_number), + ).fetchone() + if result is None: + raise RuntimeError( + f"Cannot find page revision for (page {page_id}, rev {revision_number})", + ) + (revision_id,) = result + return revision_id + + @property + def file_dir(self) -> str: + return os.path.join(self.directory, "files") + + @property + def forum_dir(self) -> str: + return os.path.join(self.directory, "forum") + + @property + def page_dir(self) -> str: + return os.path.join(self.directory, "pages") + + def meta_path(self, *paths: str) -> str: + return os.path.join(self.directory, "meta", *paths) + + def json(self, path: str) -> Union[list, dict]: + with open(path) as file: + return json.load(file) + + def run(self) -> None: + self.database.add_site( + slug=self.site_slug, + descr=self.site_descr, + url=self.site_url, + id=self.site_id, + ) + self.process_pages() + self.process_files() + self.process_forum() + + def process_pages(self) -> None: + self.process_page_metadata() + self.process_page_wikitext() + + def process_page_metadata(self) -> None: + logger.info("Ingesting page revision metadata for site %s", self.site_slug) + meta_directory = self.meta_path("pages") + for path in os.listdir(meta_directory): + with self.database.conn as cur: + logger.debug("Processing page metadata from '%s'", path) + + # NOTE: Usually page_slug is the same as page_descr, but if + # there are any colons in it, then they don't match. + # So we can use it as a temporary unique identifier + # but *not* as the slug. + page_descr, ext = os.path.splitext(path) + assert ext == ".json", "Extension for page metadata not JSON" + path = os.path.join(meta_directory, path) + + metadata = self.json(path) + self.database.add_page( + cur, + site_slug=self.site_slug, + page_descr=page_descr, + metadata=metadata, + ) + + page_id = metadata["page_id"] + for file_metadata in metadata.get("files", ()): + file_id = file_metadata["file_id"] + self.file_metadata[file_id] = file_metadata + + self.process_page_revisions_metadata( + cur, + page_id, + metadata["revisions"], + ) + self.process_page_votes(cur, page_id, metadata["votings"]) + + def process_page_revisions_metadata( + self, + cur, + page_id: int, + revisions: list[dict], + ) -> None: + logger.debug("Ingesting page revision metadata for page ID %d", page_id) + for revision in revisions: + self.database.add_page_revision_metadata(cur, page_id, revision) + + def process_page_votes( + self, + cur, + page_id: int, + votes: list[Tuple[int, int]], + ) -> None: + logger.debug("Ingesting page votes for page ID %d", page_id) + for user_id, bool_value in votes: + int_value = 1 if bool_value else -1 + self.database.add_page_vote( + cur, + user_id=user_id, + page_id=page_id, + value=int_value, + ) + + def process_page_wikitext(self) -> None: + logger.info("Ingesting page wikitext for site %s", self.site_slug) + for path in os.listdir(self.page_dir): + logger.debug("Processing page wikitext from '%s'", path) + + # See above note on page_descr + page_descr, ext = os.path.splitext(path) + assert ext == ".7z", "Extension for page wikitexts not 7z" + path = os.path.join(self.page_dir, path) + + # Extract page sources for each revision + with py7zr.SevenZipFile(path, "r") as archive: + sources = archive.readall() + + if self.database.is_deleted_page( + page_descr=page_descr, + site_slug=self.site_slug, + ): + logger.warning( + "Page descr '%s' was previously deleted, skipping", + page_descr, + ) + continue + + try: + page_id = self.get_page_id(page_descr=page_descr) + except RuntimeError: + logger.error("No page descr '%s' found to insert wikitext", page_descr) + return + + # Convert and begin adding to the database + self.process_page_revisions_wikitext(page_id, sources) + + def process_page_revisions_wikitext( + self, + page_id: int, + sources: dict[str, BytesIO], + ) -> None: + logger.debug("Ingesting %d page revision wikitexts", len(sources)) + + with self.database.conn as cur: + for filename, buf in sources.items(): + # Get revision number from filename + revision_number_str, ext = os.path.splitext(filename) + assert ext == ".txt", "Extension for page revision wikitext not txt" + revision_number = int(revision_number_str) + logger.info("Ingesting page revision %d (%d)", page_id, revision_number) + + # Get revision ID + revision_id = self.get_revision_id(cur, page_id, revision_number) + + # Convert from binary, mostly to ensure it's UTF-8 + contents = buf.read().decode("utf-8") + + # Run ingestion for this revision + self.database.add_page_revision_wikitext(cur, revision_id, contents) + + def process_files(self) -> None: + logger.info("Ingesting files for site %s", self.site_slug) + + mapping = self.json(self.meta_path("file_map.json")) + with self.database.conn as cur: + for file_id_str, entry in mapping.items(): + file_id = int(file_id_str) + wikidot_url = entry["url"] + + logger.debug("Processing file stored at %s", wikidot_url) + page_slug_url, filename = os.path.split(entry["path"]) + page_slug = percent_unquote(page_slug_url) + + try: + page_id = self.get_page_id(page_slug=page_slug) + except RuntimeError: + logger.error( + "Cannot find associated page with slug '%s'", + page_slug, + ) + continue + + path = os.path.join(self.file_dir, page_slug_url, file_id_str) + + if not os.path.isfile(path): + logger.error( + "File in map but not downloaded: %s (%s)", + page_slug_url, + file_id_str, + ) + continue + + try: + file_metadata = self.file_metadata[file_id] + mime = file_metadata["mime"] + except KeyError: + # No data, get MIME via libmagic + mime = magic.from_file(path, mime=True) + + s3_hash = self.s3.upload(path, mime) + + self.database.add_file( + cur, + file_id=file_id, + page_id=page_id, + site_slug=self.site_slug, + filename=filename, + s3_hash=s3_hash, + ) + + def process_forum(self) -> None: + logger.info("Ingesting forum data for site %s", self.site_slug) + self.process_forum_categories() + self.process_forum_data() + self.process_forum_wikitext() + + def process_forum_categories(self) -> None: + logger.debug("Processing forum categories (metadata)") + directory = self.meta_path("forum", "category") + + if not os.path.isdir(directory): + logger.warning("No forum category metadata directory") + return + + with self.database.conn as cur: + for path in os.listdir(directory): + logger.debug("Processing forum category metadata from '%s'", path) + + forum_category_id_str, ext = os.path.splitext(path) + forum_category_id = int(forum_category_id_str) + assert ext == ".json", "Extension for forum category metadata not JSON" + path = os.path.join(directory, path) + + metadata = self.json(path) + self.database.add_forum_category(cur, self.site_slug, metadata) + + def process_forum_data(self) -> None: + logger.debug("Processing forum categories") + directory = self.meta_path("forum") + + if not os.path.isdir(directory): + logger.warning("No forum category parent directory") + return + + for path in os.listdir(directory): + logger.debug("Processing forum category directory '%s'", path) + + if path == "category": + # Special metadata directory, see above + continue + + forum_category_id = int(path) + thread_directory = os.path.join(directory, path) + + for path in os.listdir(thread_directory): + with self.database.conn as cur: + logger.debug( + "Processing forum thread directory '%s'", + thread_directory, + ) + + path = os.path.join(thread_directory, path) + thread_metadata = self.json(path) + + self.database.add_forum_thread( + cur, + forum_category_id, + thread_metadata, + ) + + for post in thread_metadata["posts"]: + self.process_post( + cur, + thread_id=thread_metadata["id"], + metadata=post, + ) + + def process_post( + self, + cur, + *, + thread_id: int, + parent_post_id: Optional[int] = None, + metadata: dict, + ) -> None: + logger.info( + "Processing forum post in %d (parent %s)", + thread_id, + parent_post_id, + ) + post_id = metadata["id"] + self.database.add_forum_post( + cur, + forum_thread_id=thread_id, + parent_post_id=parent_post_id, + metadata=metadata, + ) + + logger.debug("Found %d children in forum post", len(metadata["children"])) + for child_post in metadata["children"]: + self.process_post( + cur, + thread_id=thread_id, + parent_post_id=post_id, + metadata=child_post, + ) + + logger.debug("Found %d revisions for forum post", len(metadata["revisions"])) + if metadata["revisions"]: + metadata["revisions"].sort(key=lambda d: d["id"]) + + for revision in metadata["revisions"]: + self.database.add_forum_post_revision(cur, post_id, revision) + + def process_forum_wikitext(self) -> None: + logger.info("Ingesting forum wikitext for site %s", self.site_slug) + + if not os.path.isdir(self.forum_dir): + logger.warning("No forum directory for site") + return + + # Each forum category + for category_id_str in os.listdir(self.forum_dir): + logger.debug( + "Processing forum wikitext for category ID %s", + category_id_str, + ) + category_id = int(category_id_str) + directory = os.path.join(self.forum_dir, category_id_str) + + # Each forum thread + for path in os.listdir(directory): + thread_id_str, ext = os.path.splitext(path) + assert ext == ".7z", "Extension for forum wikitexts not 7z" + path = os.path.join(directory, path) + + thread_id = int(thread_id_str) + logger.debug( + "Processing forum wikitext for thread ID %s", + thread_id_str, + ) + + # Extract page sources for each post and revision + with py7zr.SevenZipFile(path, "r") as archive: + sources = archive.readall() + + # Convert and begin adding to the database + self.process_forum_revisions_wikitext(thread_id, sources) + + def process_forum_revisions_wikitext( + self, + thread_id: int, + sources: dict[str, BytesIO], + ) -> None: + logger.debug("Ingesting %d forum thread revision wikitexts", len(sources)) + + with self.database.conn as cur: + for path, buf in sources.items(): + post_id_str, filename = os.path.split(path) + revision, ext = os.path.splitext(filename) + assert ext == ".html", "Extension for forum revision HTML not html" + post_id = int(post_id_str) + + # Convert from binary, mostly to ensure it's UTF-8 + contents = buf.read().decode("utf-8").strip() + + # This is kind of a mess because we don't have + # forum post revision IDs for the latest revision. :( + + # Per-post wikitext + if revision == "latest": + self.database.add_forum_post_wikitext(cur, post_id, contents) + # Per-revision wikitext + else: + revision_id = int(revision) + self.database.add_forum_post_revision_wikitext( + cur, + revision_id, + contents, + ) diff --git a/deepwell/importer/utils.py b/deepwell/importer/utils.py new file mode 100644 index 0000000000..41d8f53d2a --- /dev/null +++ b/deepwell/importer/utils.py @@ -0,0 +1,16 @@ +from Crypto.Hash import KangarooTwelve + +from typing import Optional + + +def from_js_timestamp(value: Optional[int]) -> Optional[int]: + if value is None: + return None + else: + return value // 1000 + + +def kangaroo_twelve(input: str) -> str: + data = input.encode("utf-8") + hash = KangarooTwelve.new(custom=data) + return hash.read(26).hex() diff --git a/deepwell/importer/wikicomma_config.py b/deepwell/importer/wikicomma_config.py new file mode 100644 index 0000000000..2eadc155c3 --- /dev/null +++ b/deepwell/importer/wikicomma_config.py @@ -0,0 +1,37 @@ +import json +import logging +import re +from collections import namedtuple + +WIKIDOT_SITE_REGEX = re.compile(r"https?:\/\/([^\.]+)\.wikidot\.com\/?") + +WikicommaConfig = namedtuple("WikicommaConfig", ("sites",)) +SiteData = namedtuple("SiteData", ("descr", "slug", "url")) + +logger = logging.getLogger(__name__) + + +def parse_config(path: str) -> WikicommaConfig: + with open(path) as file: + data = json.load(file) + + sites = {} + logger.info("Found sites:") + for pair in data["wikis"]: + descr = pair["name"] + url = pair["url"] + + match = WIKIDOT_SITE_REGEX.match(url) + if match is None: + logger.error("Cannot parse site URL: %s", url) + raise ValueError(url) + slug = match[1] + logger.info("* %s ('%s')", slug, descr) + + sites[descr] = SiteData( + descr=descr, + slug=slug, + url=url, + ) + + return WikicommaConfig(sites=sites) diff --git a/deepwell/scripts/importer/README.md b/deepwell/scripts/importer/README.md deleted file mode 100644 index 639c94fe48..0000000000 --- a/deepwell/scripts/importer/README.md +++ /dev/null @@ -1,2 +0,0 @@ -## deepwell-importer -This is a Python framework to generate a SQL file which seeds a DEEPWELL database. The goal is to provide a simple, generic interface which can be used in the implementation of content seeders. diff --git a/deepwell/scripts/importer/__init__.py b/deepwell/scripts/importer/__init__.py deleted file mode 100644 index 5dd5e9bbd4..0000000000 --- a/deepwell/scripts/importer/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .generator import generate_seed -from .structures import * -from .scuttle import run_scuttle_import -from .wikicomma import run_wikicomma_import diff --git a/deepwell/scripts/importer/constants.py b/deepwell/scripts/importer/constants.py deleted file mode 100644 index 32bf86745c..0000000000 --- a/deepwell/scripts/importer/constants.py +++ /dev/null @@ -1,4 +0,0 @@ -from datetime import datetime - -ANONYMOUS_USER_ID = 3 -UNKNOWN_CREATION_DATE = datetime.utcfromtimestamp(0) diff --git a/deepwell/scripts/importer/counter.py b/deepwell/scripts/importer/counter.py deleted file mode 100644 index 445fab0904..0000000000 --- a/deepwell/scripts/importer/counter.py +++ /dev/null @@ -1,10 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class IncrementingCounter: - value: int = 0 - - def next(self) -> int: - self.value += 1 - return self.value diff --git a/deepwell/scripts/importer/generator.py b/deepwell/scripts/importer/generator.py deleted file mode 100644 index 4982f46390..0000000000 --- a/deepwell/scripts/importer/generator.py +++ /dev/null @@ -1,314 +0,0 @@ -import hashlib -from binascii import hexlify -from typing import Iterable, Optional, Set, Union - -from .constants import * -from .counter import IncrementingCounter -from .structures import * -from .utils import get_page_category, wikidot_id_or_auto - -import psycopg2 - - -class Generator: - """ - Generates SQL and S3 invocations. - - This produces a SQL file to ingest data into DEEPWELL, as well as a - shells cript which invokes the aws utility to upload data to S3. - - The class also tracks the state of all imported Wikidot data, - as encountered. This is necessary to avoid inserting - duplicate data. - """ - - __slots__ = ( - "sql_buffer", - "sh_buffer", - "cursor", - "s3_bucket", - "page_category_id", - "user_ids", - "user_slugs", - "site_ids", - "site_slugs", - "page_ids", - "page_slugs", - "page_revision_ids", - "page_revision_numbers", - "page_categories", - "file_names", - "blob_hashes", - "text_hashes", - ) - - def __init__(self, sql_buffer, sh_buffer, cursor, s3_bucket, last_page_category_id): - self.sql_buffer = sql_buffer - self.sh_buffer = sh_buffer - self.cursor = cursor - self.s3_bucket = s3_bucket - self.page_category_id = IncrementingCounter(last_page_category_id) - - self.user_ids, self.user_slugs = set(), set() # Set[int], Set[str] - self.site_ids, self.site_slugs = set(), set() # Set[int], Set[str] - self.page_ids, self.page_slugs = set(), set() # Set[int], Set[Tuple[int, str]] - self.page_revision_ids = set() # Set[int] - self.page_revision_numbers = set() # Set[Tuple[int, int]] - self.page_categories = {} # dict[Tuple[int, str], int] - self.file_names = set() # Set[Tuple[int, str]] - self.blob_hashes = {} # dict[bytes, str] - self.text_hashes = set() # Set[bytes] - - self.sql_buffer.write("-- AUTO-GENERATED FILE\n") - self.sh_buffer.write("# AUTO-GENERATED FILE\n") - - def format(self, query: str, parameters=()) -> str: - return self.cursor.mogrify(query, parameters).decode("utf-8") - - def append_sql(self, query: str, parameters=()): - sql_line = self.format(query, parameters) - self.sql_buffer.write(f"{sql_line};\n") - - def section_sql(self, name: str): - self.sql_buffer.write(f"\n\n--\n-- {name}\n--\n\n") - - def append_sh(self, data: bytes, data_hash: bytes): - def bash_escape(d: bytes) -> str: - r"""Bash-escape binary strings. e.g. $'\x00'""" - - inner = "".join(f"\\x{b:02x}" for b in d) - return f"$'{inner}'" - - data_hash_hex = hexlify(data_hash).decode("utf-8") - bucket_path = f"s3://{self.s3_bucket}/{data_hash_hex}" - - self.sh_buffer.write( - 'file="$(mktemp)"\n' - f"printf '%s' {bash_escape(data)} > \"$file\"\n" - f'aws cp "$file" {bucket_path}\n' - f'rm "$file"\n\n' - ) - - return bucket_path - - def section_sh(self, name: str): - self.sh_buffer.write(f"\n\n#\n# {name}\n#\n\n") - - def add_user(self, user: User): - if ( - self.id_exists(self.user_ids, user.wikidot_id) - or user.slug in self.user_slugs - ): - return - - avatar_path = self.add_blob(user.avatar) - - # TODO change over when user table changes, remaining fields - self.append_sql( - "INSERT INTO users (id, slug, username, avatar_path, created_at) VALUES (%s, %s, %s, %s)", - (wikidot_id_or_auto(user), user.slug, user.name, avatar_path, user.created_at), - ) - - self.id_add(self.user_ids, user.wikidot_id) - self.user_slugs.add(user.slug) - - def add_site(self, site: Site): - if ( - self.id_exists(self.site_ids, site.wikidot_id) - or site.slug in self.site_slugs - ): - return - - self.append_sql( - "INSERT INTO site (site_id, name, slug, subtitle, description) VALUES (%s, %s, %s, %s, %s)", - (wikidot_id_or_auto(site), site.name, site.slug, site.subtitle, site.description), - ) - - self.id_add(self.site_ids, site.wikidot_id) - self.site_slugs.add(site.slug) - - def add_page(self, page: Page): - if ( - self.id_exists(self.page_ids, page.wikidot_id) - or (page.site_id, page.slug) in self.page_slugs - ): - return - - page_category_id = self.add_page_category( - page.site_id, get_page_category(page.slug), - ) - - self.append_sql( - "INSERT INTO page (page_id, created_at, updated_at, site_id, page_category_id, slug, discussion_thread_id) VALUES (%s, %s, %s, %s, %s, %s, %s)", - ( - wikidot_id_or_auto(page), - page.created_at, - page.updated_at, - page.site_id, - page_category_id, - page.slug, - page.discussion_thread_id, - ), - ) - - self.id_add(self.page_ids, page.wikidot_id) - self.page_slugs.add((page.site_id, page.slug)) - - def add_page_revisions(self, revisions: Iterable[PageRevision]): - for revision in revisions: - self.add_page_revision(revision) - - def add_page_revision(self, revision: PageRevision): - if ( - self.id_exists(self.page_revision_ids, revision.wikidot_id) - or (revision.page_id, revision.revision_number) - in self.page_revision_numbers - ): - return - - if revision.flags == "N" or revision.revision_number == 0: - revision_type = "created" - elif revision.flags == "R": - revision_type = "move" - else: - revision_type = "regular" - - wikitext_hash = self.add_text(revision.wikitext) - compiled_hash = self.add_text(revision.html) - - # TODO per-revision fields? - self.append_sql( - "INSERT INTO page_revision (revision_id, revision_type, revision_number, created_at, page_id, site_id, user_id, wikitext_hash, compiled_hash, compiled_at, compiled_generator, slug, title, tags, comments) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", - ( - wikidot_id_or_auto(revision), - revision_type, - revision.revision_number, - revision.created_at, - revision.page_id, - revision.site_id, - revision.user_id, - wikitext_hash, - compiled_hash, - revision.created_at, - "Imported from Wikidot", - revision.slug, - revision.title, - revision.tags, - revision.comments, - ), - ) - - self.id_add(self.page_revision_ids, revision.wikidot_id) - self.page_revision_numbers.add((revision.page_id, revision.revision_number)) - - def add_page_votes(self, votes: Iterable[PageVote]): - for vote in votes: - self.add_page_vote(vote) - - def add_page_vote(self, vote: PageVote): - self.append_sql( - "INSERT INTO page_vote (created_at, page_id, user_id, value) VALUES (%s, %s, %s, %s)", - (UNKNOWN_CREATION_DATE, vote.page_id, vote.user_id, vote.value), - ) - - def add_page_lock(self, page_id: int, locked: bool = True): - if locked: - self.append_sql( - "INSERT INTO page_lock (created_at, lock_type, page_id, user_id, reason) VALUES (%s, %s, %s, %s, %s)", - ( - UNKNOWN_CREATION_DATE, - "wikidot", - page_id, - ANONYMOUS_USER_ID, - "Imported from Wikidot", - ), - ) - - def add_page_category(self, site_id: int, category_slug: str) -> int: - page_category_id = self.page_categories.get((site_id, category_slug)) - - if page_category_id is None: - page_category_id = self.page_category_id.next() - self.append_sql( - "INSERT INTO page_category (category_id, site_id, slug) VALUES (%s, %s, %s)", - (page_category_id, site_id, category_slug), - ) - - return page_category_id - - def add_file(self, file: File): - if ( - self.id_exists(self.file_ids, file.wikidot_id) - or (file.page_id, file.name) in self.file_names - ): - return - - self.append_sql( - "INSERT INTO file (file_id, created_at, name, page_id) VALUES (%s, %s, %s, %s)", - (wikidot_id_or_auto(file), file.created_at, file.name, file.page_id), - ) - self.file_names.add((file.page_id, file.name)) - - # TODO add forums - - def add_blob(self, data: bytes) -> str: - data_hash = hashlib.sha512(data).digest() - s3_url = self.blob_hashes.get(data_hash) - - if s3_url is None: - s3_url = self.append_sh(data, data_hash) - self.blob_hashes[data_hash] = s3_url - - return s3_url - - def add_text(self, text: str) -> bytes: - text_bytes = text.encode("utf-8") - text_hash = hashlib.sha512(text_bytes).digest() - - if text_hash not in self.text_hashes: - self.append_sql( - "INSERT INTO text (hash, contents) VALUES (%s, %s)", (text_hash, text), - ) - self.text_hashes.add(text_hash) - - return text_hash - - def id_exists(self, field: Set[int], id: Optional[int]) -> bool: - if id is None: - return False - - return id in field - - def id_add(self, field: Set[int], id: Optional[int]): - if id is None: - return - - field.add(id) - - -def generate_seed( - runner: callable, - *, - sql_path: str, - sh_path: str, - s3_bucket: str, - postgres_url: str, - last_page_category_id: int = 0, -): - """ - Given a function which takes a Generator, run through whatever backup and add all the relevant information. - The generator will ensure duplicate data is not added. - """ - - with open(sql_path, "w") as sql_file: - with open(sh_path, "w") as sh_file: - with psycopg2.connect(postgres_url) as connection: - with connection.cursor() as cursor: - generator = Generator( - sql_file, - sh_file, - cursor, - s3_bucket, - last_page_category_id, - ) - runner(generator) diff --git a/deepwell/scripts/importer/requirements.txt b/deepwell/scripts/importer/requirements.txt deleted file mode 100644 index 587febffe8..0000000000 --- a/deepwell/scripts/importer/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -psycopg2>=2.9.3 -py7zr>=0.19.0 diff --git a/deepwell/scripts/importer/scuttle.py b/deepwell/scripts/importer/scuttle.py deleted file mode 100644 index 8e6f248b0d..0000000000 --- a/deepwell/scripts/importer/scuttle.py +++ /dev/null @@ -1,5 +0,0 @@ -# TODO - - -def run_scuttle_import(): - raise NotImplementedError diff --git a/deepwell/scripts/importer/structures.py b/deepwell/scripts/importer/structures.py deleted file mode 100644 index 48c0a86ea5..0000000000 --- a/deepwell/scripts/importer/structures.py +++ /dev/null @@ -1,72 +0,0 @@ -from dataclasses import dataclass -from datetime import datetime -from typing import List, Optional - - -@dataclass(frozen=True) -class User: - # None means the ID isn't known, so we should assign our own - wikidot_id: Optional[int] - created_at: datetime - name: str - slug: str - avatar: bytes - - -@dataclass(frozen=True) -class Site: - wikidot_id: Optional[int] - created_at: datetime - name: str - slug: str - subtitle: str - description: str - - -@dataclass(frozen=True) -class Page: - wikidot_id: Optional[int] - created_at: datetime - updated_at: datetime - site_id: int - title: str - slug: str - discussion_thread_id: Optional[int] - - -@dataclass(frozen=True) -class PageRevision: - wikidot_id: Optional[int] - revision_number: int - created_at: datetime - flags: str - page_id: int - site_id: int - user_id: int - wikitext: str - html: str - slug: str - title: str - tags: List[str] - comments: str - - -@dataclass(frozen=True) -class PageVote: - page_id: int - user_id: int - value: int - - -@dataclass(frozen=True) -class File: - wikidot_id: Optional[int] - page_id: int - name: str - mime: str - size: int - user_id: int - created_at: datetime - - -# TODO forums diff --git a/deepwell/scripts/importer/utils.py b/deepwell/scripts/importer/utils.py deleted file mode 100644 index 17999a981b..0000000000 --- a/deepwell/scripts/importer/utils.py +++ /dev/null @@ -1,29 +0,0 @@ -from psycopg2.extensions import register_adapter, AsIs - - -def get_page_category(page_slug): - parts = page_slug.split(":") - if len(parts) == 1: - return "_default" - - return parts[0] - - -class SqlRaw: - __slots__ = ("value",) - - def __init__(self, value: str): - self.value = value - - def adapt(self): - return AsIs(self.value) - - -def wikidot_id_or_auto(item): - if item.wikidot_id is None: - return SqlRaw("DEFAULT") - else: - return item.wikidot_id - - -register_adapter(SqlRaw, SqlRaw.adapt) diff --git a/deepwell/scripts/importer/wikicomma.py b/deepwell/scripts/importer/wikicomma.py deleted file mode 100644 index 5a417dd037..0000000000 --- a/deepwell/scripts/importer/wikicomma.py +++ /dev/null @@ -1,287 +0,0 @@ -import json -import logging -import os -import re -from datetime import datetime - -from .constants import UNKNOWN_CREATION_DATE -from .generator import generate_seed -from .structures import * - -from py7zr import SevenZipFile - -REVISION_FILENAME_REGEX = re.compile(r"(\d+)\.txt") - -logger = logging.getLogger(__name__) - - -class WikicommaImporter: - __slots__ = ( - "generator", - "directory", - "replace_colon", - ) - - def __init__(self, generator, directory, replace_colon=True): - self.generator = generator - self.directory = directory - self.replace_colon = replace_colon - - def process_all(self): - logger.info("Processing all sites") - self.generator.section_sql("Wikicomma") - self.generator.section_sh("Files") - - for site_slug in os.listdir(self.directory): - self.process_site(site_slug) - - def process_site(self, site_slug): - logger.info("Processing site %s", site_slug) - self.generator.section_sql(f"Site: {site_slug}") - - # Add site - unknown_description = f"[NEEDS UPDATE] {site_slug}" - self.generator.add_site( - Site( - wikidot_id=None, - created_at=UNKNOWN_CREATION_DATE, - name=unknown_description, - slug=site_slug, - subtitle=unknown_description, - description=unknown_description, - ) - ) - - # Process site internals - site_directory = os.path.join(self.directory, site_slug) - self.process_site_pages(site_slug, site_directory) - self.process_site_forum(site_slug, site_directory) - - def process_site_pages(self, site_slug: str, site_directory: str): - page_mapping = self.read_json(site_directory, "meta", "page_id_map.json") - file_mapping = self.read_json(site_directory, "meta", "file_map.json") - logger.info("Processing %d pages", len(page_mapping)) - - def get_first_last_revisions(revisions: List[dict]): - # Since the revision list isn't always in order... - start_revision = revisions[0] - last_revision = revisions[0] - - for revision in revisions: - if revision["revision"] < start_revision["revision"]: - start_revision = revision - - if revision["revision"] > last_revision["revision"]: - last_revision = revision - - return start_revision, last_revision - - for page_id, page_slug in page_mapping.items(): - self.generator.section_sql(f"Page: {page_slug}") - page_id = int(page_id) - metadata = self.read_page_metadata(site_directory, page_slug) - start_revision, last_revision = get_first_last_revisions( - metadata["revisions"] - ) - created_at = datetime.fromtimestamp(start_revision["stamp"]) - updated_at = datetime.fromtimestamp(last_revision["stamp"]) - site_id = -1 # TODO unknown - - self.generator.add_page( - Page( - wikidot_id=page_id, - created_at=created_at, - updated_at=updated_at, - site_id=site_id, - title=metadata.get("title", ""), - slug=page_slug, - discussion_thread_id=None, # TODO unknown - ) - ) - self.generator.add_page_lock(page_id, metadata.get("is_locked", False)) - self.process_page_revisions(site_directory, site_id, metadata) - self.process_page_files( - site_directory, - page_id, - file_mapping, - metadata["files"], - ) - self.process_page_votes(metadata) - - def process_page_revisions(self, site_directory: str, site_id: int, metadata: dict): - page_slug = metadata["name"] - page_id = metadata["page_id"] - # NOTE: We don't know what these are historically, - title = metadata.get("title", "") - tags = metadata.get("tags", []) - logger.info("Processing revisions for page %s (%d)", page_slug, page_id) - - wikitext_mapping = {} - with self.open_page_revisions(site_directory, page_slug) as archive: - for filename, data in archive.readall().items(): - match = REVISION_FILENAME_REGEX.fullmatch(filename) - revision_number = int(match[1]) - wikitext = data.read().decode("utf-8") - - for revision in metadata["revisions"]: - revision_number = revision["revision"] - user_spec = revision["author"] - logger.debug("Processing revision number %d", revision_number) - - # Is user slug, not a user ID - if isinstance(user_spec, str): - # TODO get ID - logger.warn("Skipping revision, unknown user: %s", user_spec) - continue - - wikitext = wikitext_mapping.get(revision_number) - if wikitext is None: - logger.error("No wikitext found for revision number %d", revision_number) - continue - - self.generator.add_page_revision( - PageRevision( - wikidot_id=revision["global_revision"], - revision_number=revision_number, - created_at=datetime.fromtimestamp(revision["stamp"]), - flags=revision["flags"], - page_id=page_id, - site_id=site_id, - user_id=user_spec, - wikitext=wikitext, - slug=page_slug, - title=title, - html="", # TODO not stored - tags=tags, - comments=revision["commentary"], - ) - ) - - def process_page_files( - self, - site_directory: str, - page_id: int, - file_mapping: dict, - metadata_list: list, - ): - logger.info("Processing files for page ID %d", page_id) - - for metadata in metadata_list: - file_id = metadata["file_id"] - logger.debug("Processing file ID %d", file_id) - - user_spec = metadata["author"] - # Is user slug, not a user ID - if isinstance(user_spec, str): - # TODO get ID - logger.warn("Skipping file, unknown user: %s", user_spec) - continue - - file_location = file_mapping[str(file_id)] - file_path = os.path.join(site_directory, "files", file_location["path"]) - if not os.path.exists(file_path): - logger.error("Path %s does not exist", file_path) - continue - - with open(file_path, "rb") as file: - file_data = file.read() - - self.generator.add_file( - File( - wikidot_id=metadata["file_id"], - page_id=page_id, - name=metadata["name"], - mime=metadata["mime"], - size=metadata["size_bytes"], - user_id=user_spec, - created_at=datetime.fromtimestamp(metadata["stamp"]), - ) - ) - - def process_page_votes(self, metadata: dict): - logger.info("Processing %d votes", len(metadata["votings"])) - - for (user_spec, value) in metadata["votings"]: - logger.debug("Processing vote by %s", user_spec) - - # Is user slug, not a user ID - if isinstance(user_spec, str): - # TODO get ID - logger.warn("Skipping vote, unknown user: %s", user_spec) - continue - - # Get vote value - if isinstance(value, bool): - value = +1 if value else -1 - - self.generator.add_page_vote( - PageVote( - page_id=metadata["page_id"], - user_id=user_spec, - value=value, - ) - ) - - def process_site_forum(self, site_slug: str, site_directory: str): - logger.info("Processing forum posts for site %s", site_slug) - self.generator.section_sql(f"Forum: {site_slug} [TODO]") - # TODO - - def read_page_metadata(self, site_directory: str, page_slug: str): - page_metadata_filename = f"{page_slug}.json" - - if self.replace_colon: - page_metadata_filename = page_metadata_filename.replace(":", "_") - - page_metadata = self.read_json( - site_directory, - "meta", - "pages", - page_metadata_filename, - ) - - assert page_metadata["name"] == page_slug - return page_metadata - - def open_page_revisions(self, site_directory: str, page_slug: str): - page_revisions_filename = f"{page_slug}.7z" - - if self.replace_colon: - page_revisions_filename = page_revisions_filename.replace(":", "_") - - page_revisions_path = os.path.join( - site_directory, "pages", page_revisions_filename, - ) - return SevenZipFile(page_revisions_path, "r") - - @staticmethod - def read_json(*path_parts): - path = os.path.join(*path_parts) - - with open(path) as file: - return json.load(file) - - -def run_wikicomma_import( - *, - wikicomma_directory: str, - sql_path: str, - sh_path: str, - s3_bucket: str, - postgres_url: str, - last_page_category_id: int = 0, -): - wikicomma_directory = os.path.normpath(wikicomma_directory) - - def runner(generator): - importer = WikicommaImporter(generator, wikicomma_directory) - importer.process_all() - - generate_seed( - runner, - sql_path=sql_path, - sh_path=sh_path, - s3_bucket=s3_bucket, - postgres_url=postgres_url, - last_page_category_id=last_page_category_id, - ) diff --git a/deepwell/scripts/wikicomma_import.py b/deepwell/scripts/wikicomma_import.py deleted file mode 100755 index 8fb095abdd..0000000000 --- a/deepwell/scripts/wikicomma_import.py +++ /dev/null @@ -1,84 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import logging -import sys -from importer import run_wikicomma_import - -LOG_FORMAT = "[%(levelname)s] %(asctime)s %(name)s: %(message)s" -LOG_DATE_FORMAT = "[%Y/%m/%d %H:%M:%S]" - -if __name__ == "__main__": - argparser = argparse.ArgumentParser(description="WikiComma importer") - argparser.add_argument( - "-q", - "--quiet", - "--no-stdout", - dest="stdout", - action="store_false", - help="Don't output to standard out.", - ) - argparser.add_argument( - "-D", - "--debug", - dest="debug", - action="store_true", - help="Set logging level to debug.", - ) - argparser.add_argument( - "-d", - "--directory", - "--wikicomma-directory", - dest="wikicomma_directory", - required=True, - help="The directory where WikiComma data resides", - ) - argparser.add_argument( - "-o", - "--sql", - "--output-sql", - dest="sql_path", - required=True, - help="The location to output the SQL dump to", - ) - argparser.add_argument( - "-s", - "--shell", - "--output-shell", - dest="sh_path", - required=True, - help="The location to output the shell dump to", - ) - argparser.add_argument( - "-b", - "--s3", - "--s3-bucket", - dest="s3_bucket", - required=True, - help="The name of the S3 bucket to use (read-only)", - ) - argparser.add_argument( - "-u", - "--postgres-url", - dest="postgres_url", - required=True, - help="The DEEPWELL database to connect to (read-only)", - ) - args = argparser.parse_args() - - log_fmtr = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATE_FORMAT) - log_stdout = logging.StreamHandler(sys.stdout) - log_stdout.setFormatter(log_fmtr) - log_level = logging.DEBUG if args.debug else logging.INFO - - logger = logging.getLogger("importer") - logger.setLevel(level=log_level) - logger.addHandler(log_stdout) - - run_wikicomma_import( - wikicomma_directory=args.wikicomma_directory, - sql_path=args.sql_path, - sh_path=args.sh_path, - s3_bucket=args.s3_bucket, - postgres_url=args.postgres_url, - ) diff --git a/deepwell/src/macros.rs b/deepwell/src/macros.rs index 52493cf287..48dd7ed965 100644 --- a/deepwell/src/macros.rs +++ b/deepwell/src/macros.rs @@ -37,7 +37,6 @@ macro_rules! str_write { /// This is done because the only failure mode for writing to a `String` /// would be insufficient memory, which would cause an abort anyways. /// -/// /// # See also /// * [`str_write!`](macro.str_write.html) macro_rules! str_writeln {