Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ETL: 投稿への埋め込みリンクのCSV生成 #111

Merged
merged 6 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/birdxplorer_api/routers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def get_notes(
created_at_to: Union[None, TwitterTimestamp] = Query(default=None),
topic_ids: Union[List[TopicId], None] = Query(default=None),
post_ids: Union[List[PostId], None] = Query(default=None),
current_status: Union[None, List[str]] = Query(default=None),
language: Union[LanguageIdentifier, None] = Query(default=None),
) -> NoteListResponse:
return NoteListResponse(
Expand All @@ -86,6 +87,7 @@ def get_notes(
created_at_to=created_at_to,
topic_ids=topic_ids,
post_ids=post_ids,
current_status=current_status,
language=language,
)
)
Expand Down
8 changes: 8 additions & 0 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="ja",
summary="要約文1",
current_status="NEEDS_MORE_RATINGS",
created_at=1152921600000,
),
note_factory.build(
Expand All @@ -118,6 +119,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[],
language="en",
summary="summary2",
current_status="NEEDS_MORE_RATINGS",
created_at=1152921601000,
),
note_factory.build(
Expand All @@ -126,6 +128,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[1]],
language="en",
summary="summary3",
current_status="",
created_at=1152921602000,
),
note_factory.build(
Expand All @@ -134,6 +137,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0], topic_samples[1], topic_samples[2]],
language="en",
summary="summary4",
current_status="CURRENTLY_RATED_HELPFUL",
created_at=1152921603000,
),
note_factory.build(
Expand All @@ -142,6 +146,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="en",
summary="summary5",
current_status="CURRENTLY_RATED_HELPFUL",
created_at=1152921604000,
),
]
Expand Down Expand Up @@ -285,6 +290,7 @@ def _get_notes(
created_at_to: Union[None, TwitterTimestamp] = None,
topic_ids: Union[List[TopicId], None] = None,
post_ids: Union[List[PostId], None] = None,
current_status: Union[None, List[str]] = None,
language: Union[LanguageIdentifier, None] = None,
) -> Generator[Note, None, None]:
for note in note_samples:
Expand All @@ -298,6 +304,8 @@ def _get_notes(
continue
if post_ids is not None and note.post_id not in post_ids:
continue
if current_status is not None and note.current_status not in current_status:
continue
if language is not None and note.language != language:
continue
yield note
Expand Down
1 change: 1 addition & 0 deletions common/birdxplorer_common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ class Note(BaseModel):
language: LanguageIdentifier
topics: List[Topic]
summary: SummaryString
current_status: str | None
created_at: TwitterTimestamp


Expand Down
42 changes: 42 additions & 0 deletions common/birdxplorer_common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class NoteRecord(Base):
topics: Mapped[List[NoteTopicAssociation]] = relationship()
language: Mapped[LanguageIdentifier] = mapped_column(nullable=False)
summary: Mapped[SummaryString] = mapped_column(nullable=False)
current_status: Mapped[String] = mapped_column(nullable=True)
created_at: Mapped[TwitterTimestamp] = mapped_column(nullable=False)


Expand Down Expand Up @@ -151,6 +152,34 @@ class RowNoteRecord(Base):
row_post: Mapped["RowPostRecord"] = relationship("RowPostRecord", back_populates="row_notes")


class RowNoteStatusRecord(Base):
__tablename__ = "row_note_status"

note_id: Mapped[NoteId] = mapped_column(ForeignKey("row_notes.note_id"), primary_key=True)
note_author_participant_id: Mapped[ParticipantId] = mapped_column(nullable=False)
created_at_millis: Mapped[TwitterTimestamp] = mapped_column(nullable=False)
timestamp_millis_of_first_non_n_m_r_status: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
first_non_n_m_r_status: Mapped[String] = mapped_column(nullable=True)
timestamp_millis_of_current_status: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
current_status: Mapped[String] = mapped_column(nullable=True)
timestamp_millis_of_latest_non_n_m_r_status: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
most_recent_non_n_m_r_status: Mapped[String] = mapped_column(nullable=True)
timestamp_millis_of_status_lock: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
locked_status: Mapped[String] = mapped_column(nullable=True)
timestamp_millis_of_retro_lock: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
current_core_status: Mapped[String] = mapped_column(nullable=True)
current_expansion_status: Mapped[String] = mapped_column(nullable=True)
current_group_status: Mapped[String] = mapped_column(nullable=True)
current_decided_by: Mapped[String] = mapped_column(nullable=True)
current_modeling_group: Mapped[int] = mapped_column(nullable=True)
timestamp_millis_of_most_recent_status_change: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
timestamp_millis_of_nmr_due_to_min_stable_crh_time: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
current_multi_group_status: Mapped[String] = mapped_column(nullable=True)
current_modeling_multi_group: Mapped[int] = mapped_column(nullable=True)
timestamp_minute_of_final_scoring_output: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
timestamp_millis_of_first_nmr_due_to_min_stable_crh_time: Mapped[TwitterTimestamp] = mapped_column(nullable=True)


class RowPostRecord(Base):
__tablename__ = "row_posts"

Expand All @@ -171,6 +200,15 @@ class RowPostRecord(Base):
user: Mapped["RowUserRecord"] = relationship("RowUserRecord", back_populates="row_post")


class RowPostEmbedURLRecord(Base):
__tablename__ = "row_post_embed_urls"

post_id: Mapped[PostId] = mapped_column(ForeignKey("row_posts.post_id"), primary_key=True)
url: Mapped[String] = mapped_column(primary_key=True)
expanded_url: Mapped[String] = mapped_column(nullable=False)
unwound_url: Mapped[String] = mapped_column(nullable=False)


class RowUserRecord(Base):
__tablename__ = "row_users"

Expand Down Expand Up @@ -244,6 +282,7 @@ def get_notes(
created_at_to: Union[None, TwitterTimestamp] = None,
topic_ids: Union[List[TopicId], None] = None,
post_ids: Union[List[PostId], None] = None,
current_status: Union[None, List[str]] = None,
language: Union[LanguageIdentifier, None] = None,
) -> Generator[NoteModel, None, None]:
with Session(self.engine) as sess:
Expand All @@ -268,6 +307,8 @@ def get_notes(
query = query.filter(NoteRecord.post_id.in_(post_ids))
if language is not None:
query = query.filter(NoteRecord.language == language)
if current_status is not None:
query = query.filter(NoteRecord.current_status.in_(current_status))
for note_record in query.all():
yield NoteModel(
note_id=note_record.note_id,
Expand All @@ -285,6 +326,7 @@ def get_notes(
],
language=LanguageIdentifier.normalize(note_record.language),
summary=note_record.summary,
current_status=note_record.current_status,
created_at=note_record.created_at,
)

Expand Down
6 changes: 6 additions & 0 deletions common/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="ja",
summary="要約文1",
current_status=None,
created_at=1152921600000,
),
note_factory.build(
Expand All @@ -153,6 +154,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[],
language="en",
summary="summary2",
current_status=None,
created_at=1152921601000,
),
note_factory.build(
Expand All @@ -161,6 +163,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[1]],
language="en",
summary="summary3",
current_status=None,
created_at=1152921602000,
),
note_factory.build(
Expand All @@ -169,6 +172,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0], topic_samples[1], topic_samples[2]],
language="en",
summary="summary4",
current_status=None,
created_at=1152921603000,
),
note_factory.build(
Expand All @@ -177,6 +181,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="en",
summary="summary5",
current_status=None,
created_at=1152921604000,
),
note_factory.build(
Expand All @@ -185,6 +190,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="en",
summary="summary6_empty_post_id",
current_status=None,
created_at=1152921604000,
),
]
Expand Down
12 changes: 12 additions & 0 deletions common/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,18 @@ def test_get_notes_by_post_ids_empty(
assert expected == actual


def test_get_notes_by_note_status(
engine_for_test: Engine,
note_samples: List[Note],
note_records_sample: List[NoteRecord],
) -> None:
storage = Storage(engine=engine_for_test)
current_status = ["NEEDS_MORE_RATINGS"]
expected = [note for note in note_samples if note.current_status in current_status]
actual = list(storage.get_notes(current_status=current_status))
assert expected == actual


def test_get_notes_by_language(
engine_for_test: Engine,
note_samples: List[Note],
Expand Down
14 changes: 11 additions & 3 deletions etl/.env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
X_BEARER_TOKEN=
AI_MODEL=

COMMUNITY_NOTE_DAYS_AGO=3

TARGET_TWITTER_POST_START_UNIX_MILLISECOND=1719851000000
TARGET_TWITTER_POST_END_UNIX_MILLISECOND=1719891000000

AI_MODEL=openai
OPENAPI_TOKEN=
CLAUDE_TOKEN=
TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND=1720900800000
TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND=1722110400000
TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND=1719851000000
TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND=1719891000000

USE_DUMMY_DATA=False
66 changes: 60 additions & 6 deletions etl/src/birdxplorer_etl/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from prefect import get_run_logger
from sqlalchemy.orm import Session
from lib.x.postlookup import lookup
from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord
from birdxplorer_common.storage import (
RowNoteRecord,
RowPostRecord,
RowUserRecord,
RowNoteStatusRecord,
RowPostEmbedURLRecord,
)
import settings


Expand All @@ -28,9 +34,16 @@ def extract_data(db: Session):
> datetime.timestamp(date) - 24 * 60 * 60 * settings.COMMUNITY_NOTE_DAYS_AGO
):
break
url = f'https://ton.twimg.com/birdwatch-public-data/{date.strftime("%Y/%m/%d")}/notes/notes-00000.tsv'
logger.info(url)
res = requests.get(url)

dateString = date.strftime("%Y/%m/%d")
note_url = f"https://ton.twimg.com/birdwatch-public-data/{dateString}/notes/notes-00000.tsv"
if settings.USE_DUMMY_DATA:
note_url = (
"https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/notes_sample.tsv"
)

logger.info(note_url)
res = requests.get(note_url)

if res.status_code == 200:
# res.contentをdbのNoteテーブル
Expand All @@ -39,13 +52,43 @@ def extract_data(db: Session):
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

rows_to_add = []
for row in reader:
for index, row in enumerate(reader):
if db.query(RowNoteRecord).filter(RowNoteRecord.note_id == row["note_id"]).first():
continue
rows_to_add.append(RowNoteRecord(**row))
if index % 1000 == 0:
db.bulk_save_objects(rows_to_add)
rows_to_add = []
db.bulk_save_objects(rows_to_add)

break
status_url = f"https://ton.twimg.com/birdwatch-public-data/{dateString}/noteStatusHistory/noteStatusHistory-00000.tsv"
if settings.USE_DUMMY_DATA:
status_url = "https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/noteStatus_sample.tsv"

logger.info(status_url)
res = requests.get(status_url)

if res.status_code == 200:
tsv_data = res.content.decode("utf-8").splitlines()
reader = csv.DictReader(tsv_data, delimiter="\t")
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

rows_to_add = []
for index, row in enumerate(reader):
for key, value in list(row.items()):
if value == "":
row[key] = None
status = db.query(RowNoteStatusRecord).filter(RowNoteStatusRecord.note_id == row["note_id"]).first()
if status is None or status.created_at_millis > int(datetime.now().timestamp() * 1000):
db.query(RowNoteStatusRecord).filter(RowNoteStatusRecord.note_id == row["note_id"]).delete()
rows_to_add.append(RowNoteStatusRecord(**row))
if index % 1000 == 0:
db.bulk_save_objects(rows_to_add)
rows_to_add = []
db.bulk_save_objects(rows_to_add)

break

date = date - timedelta(days=1)

db.commit()
Expand Down Expand Up @@ -122,6 +165,17 @@ def extract_data(db: Session):
lang=post["data"]["lang"],
)
db.add(db_post)

if "entities" in post["data"] and "urls" in post["data"]["entities"]:
for url in post["data"]["entities"]["urls"]:
if "unwound_url" in url:
post_url = RowPostEmbedURLRecord(
post_id=post["data"]["id"],
url=url["url"] if url["url"] else None,
expanded_url=url["expanded_url"] if url["expanded_url"] else None,
unwound_url=url["unwound_url"] if url["unwound_url"] else None,
)
db.add(post_url)
note.row_post_id = tweet_id
db.commit()
continue
Expand Down
14 changes: 13 additions & 1 deletion etl/src/birdxplorer_etl/lib/sqlite/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker

from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord
from birdxplorer_common.storage import (
RowNoteRecord,
RowPostRecord,
RowUserRecord,
RowPostEmbedURLRecord,
RowNoteStatusRecord,
)


def init_db():
Expand All @@ -24,9 +30,15 @@ def init_db():
if not inspect(engine).has_table("row_posts"):
logger.info("Creating table post")
RowPostRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_note_status"):
logger.info("Creating table note_status")
RowNoteStatusRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_users"):
logger.info("Creating table user")
RowUserRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_post_embed_urls"):
logger.info("Creating table post_embed_urls")
RowPostEmbedURLRecord.metadata.create_all(engine)

Session = sessionmaker(bind=engine)

Expand Down
2 changes: 2 additions & 0 deletions etl/src/birdxplorer_etl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@
CLAUDE_TOKEN = os.getenv("CLAUDE_TOKEN")
TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND = os.getenv("TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND")
TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND = os.getenv("TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND")

USE_DUMMY_DATA = os.getenv("USE_DUMMY_DATA", "False") == "True"
Loading
Loading