From 223f57bc79cd1b89dd47a467733a7d2170120f6c Mon Sep 17 00:00:00 2001 From: sushichan044 Date: Thu, 10 Oct 2024 15:14:38 +0900 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20X=20API=E3=81=8B=E3=82=89Post?= =?UTF-8?q?=E3=82=92=E5=8F=96=E5=BE=97=E3=81=99=E3=82=8B=E9=9A=9B=E3=81=AB?= =?UTF-8?q?Media=E6=83=85=E5=A0=B1=E3=82=92=E4=BF=9D=E5=AD=98=E3=81=99?= =?UTF-8?q?=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/birdxplorer_common/storage.py | 12 ++++++++++++ etl/src/birdxplorer_etl/extract.py | 23 +++++++++++++++++++---- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/common/birdxplorer_common/storage.py b/common/birdxplorer_common/storage.py index 6eee4af..5229dc1 100644 --- a/common/birdxplorer_common/storage.py +++ b/common/birdxplorer_common/storage.py @@ -223,6 +223,18 @@ class RowPostRecord(Base): user: Mapped["RowUserRecord"] = relationship("RowUserRecord", back_populates="row_post") +class RowPostMediaRecord(Base): + __tablename__ = "row_post_media" + + media_key: Mapped[String] = mapped_column(primary_key=True) + + url: Mapped[String] = mapped_column(nullable=False) + type: Mapped[MediaType] = mapped_column(nullable=False) + width: Mapped[NonNegativeInt] = mapped_column(nullable=False) + height: Mapped[NonNegativeInt] = mapped_column(nullable=False) + + post_id: Mapped[PostId] = mapped_column(ForeignKey("row_posts.post_id"), nullable=False) + class RowPostEmbedURLRecord(Base): __tablename__ = "row_post_embed_urls" diff --git a/etl/src/birdxplorer_etl/extract.py b/etl/src/birdxplorer_etl/extract.py index b29c5de..b268b9c 100644 --- a/etl/src/birdxplorer_etl/extract.py +++ b/etl/src/birdxplorer_etl/extract.py @@ -7,6 +7,7 @@ from lib.x.postlookup import lookup from birdxplorer_common.storage import ( RowNoteRecord, + RowPostMediaRecord, RowPostRecord, RowUserRecord, RowNoteStatusRecord, @@ -145,16 +146,17 @@ def extract_data(db: Session): db.add(db_user) media_data = ( - post["includes"]["media"][0] + post["includes"]["media"] if "includes" in post and "media" in post["includes"] and len(post["includes"]["media"]) > 0 - else {} + else [{}] ) + db_post = RowPostRecord( post_id=post["data"]["id"], author_id=post["data"]["author_id"], text=post["data"]["text"], - media_type=media_data.get("type", ""), - media_url=media_data.get("url", ""), + media_type=media_data[0].get("type", ""), + media_url=media_data[0].get("url", ""), created_at=created_at_millis, like_count=post["data"]["public_metrics"]["like_count"], repost_count=post["data"]["public_metrics"]["retweet_count"], @@ -166,6 +168,19 @@ def extract_data(db: Session): ) db.add(db_post) + media_recs = [ + RowPostMediaRecord( + media_key=m["media_key"], + type=m["type"], + url=m["url"], + width=m["width"], + height=m["height"], + post_id=post["data"]["id"], + ) + for m in media_data + ] + db.add_all(media_recs) + if "entities" in post["data"] and "urls" in post["data"]["entities"]: for url in post["data"]["entities"]["urls"]: if "unwound_url" in url: From 5229c676ec536713025cf8983431281190ed5e5c Mon Sep 17 00:00:00 2001 From: sushichan044 Date: Thu, 10 Oct 2024 21:03:58 +0900 Subject: [PATCH 2/2] =?UTF-8?q?ETL=E3=81=AE=E3=83=A1=E3=83=87=E3=82=A3?= =?UTF-8?q?=E3=82=A2=E6=83=85=E5=A0=B1=E3=82=92csv=E3=81=AB=E6=9B=B8?= =?UTF-8?q?=E3=81=8D=E5=87=BA=E3=81=9B=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB?= =?UTF-8?q?=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etl/src/birdxplorer_etl/transform.py | 68 ++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/etl/src/birdxplorer_etl/transform.py b/etl/src/birdxplorer_etl/transform.py index bd977f9..f4c9fc5 100644 --- a/etl/src/birdxplorer_etl/transform.py +++ b/etl/src/birdxplorer_etl/transform.py @@ -1,22 +1,27 @@ -from sqlalchemy import select, func, and_, Integer +import csv +import os +import random +import uuid +from pathlib import Path +from typing import Generator + +from prefect import get_run_logger +from sqlalchemy import Integer, and_, func, select from sqlalchemy.orm import Session + from birdxplorer_common.storage import ( RowNoteRecord, - RowPostRecord, - RowUserRecord, RowNoteStatusRecord, RowPostEmbedURLRecord, + RowPostMediaRecord, + RowPostRecord, + RowUserRecord, ) from birdxplorer_etl.lib.ai_model.ai_model_interface import get_ai_service from birdxplorer_etl.settings import ( - TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND, TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND, + TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND, ) -import csv -import os -from prefect import get_run_logger -import uuid -import random def transform_data(db: Session): @@ -147,6 +152,7 @@ def transform_data(db: Session): offset += limit # Transform row post embed link + write_media_csv(db) generate_post_link(db) # Transform row post embed url data and generate post_embed_url.csv @@ -178,6 +184,50 @@ def transform_data(db: Session): return +def write_media_csv(db: Session) -> None: + media_csv_path = Path("./data/transformed/media.csv") + post_media_association_csv_path = Path("./data/transformed/post_media_association.csv") + + if media_csv_path.exists(): + media_csv_path.unlink(missing_ok=True) + if post_media_association_csv_path.exists(): + post_media_association_csv_path.unlink(missing_ok=True) + + with ( + media_csv_path.open("a", newline="", encoding="utf-8") as media_csv, + post_media_association_csv_path.open("a", newline="", encoding="utf-8") as assoc_csv, + ): + media_fields = ["media_key", "type", "url", "width", "height", "post_id"] + media_writer = csv.DictWriter(media_csv, fieldnames=media_fields) + media_writer.writeheader() + assoc_fields = ["post_id", "media_key"] + assoc_writer = csv.DictWriter(assoc_csv, fieldnames=assoc_fields) + assoc_writer.writeheader() + + for m in _iterate_media(db): + media_writer.writerow( + { + "media_key": m.media_key, + "type": m.type, + "url": m.url, + "width": m.width, + "height": m.height, + "post_id": m.post_id, + } + ) + assoc_writer.writerow({"post_id": m.post_id, "media_key": m.media_key}) + + +def _iterate_media(db: Session, limit: int = 1000) -> Generator[RowPostMediaRecord, None, None]: + offset = 0 + total_media: int = db.query(func.count(RowPostMediaRecord.media_key)).scalar() or 0 + + while offset < total_media: + yield from db.query(RowPostMediaRecord).limit(limit).offset(offset) + + offset += limit + + def generate_post_link(db: Session): link_csv_file_path = "./data/transformed/post_link.csv" association_csv_file_path = "./data/transformed/post_link_association.csv"