Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
yu23ki14 committed Oct 1, 2024
1 parent 260f73b commit b493e3f
Show file tree
Hide file tree
Showing 9 changed files with 589 additions and 7 deletions.
2 changes: 2 additions & 0 deletions api/birdxplorer_api/routers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def get_notes(
created_at_to: Union[None, TwitterTimestamp] = Query(default=None),
topic_ids: Union[List[TopicId], None] = Query(default=None),
post_ids: Union[List[PostId], None] = Query(default=None),
current_status: Union[None, List[str]] = Query(default=None),
language: Union[LanguageIdentifier, None] = Query(default=None),
) -> NoteListResponse:
return NoteListResponse(
Expand All @@ -86,6 +87,7 @@ def get_notes(
created_at_to=created_at_to,
topic_ids=topic_ids,
post_ids=post_ids,
current_status=current_status,
language=language,
)
)
Expand Down
8 changes: 8 additions & 0 deletions api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="ja",
summary="要約文1",
current_status="NEEDS_MORE_RATINGS",
created_at=1152921600000,
),
note_factory.build(
Expand All @@ -101,6 +102,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[],
language="en",
summary="summary2",
current_status="NEEDS_MORE_RATINGS",
created_at=1152921601000,
),
note_factory.build(
Expand All @@ -109,6 +111,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[1]],
language="en",
summary="summary3",
current_status="",
created_at=1152921602000,
),
note_factory.build(
Expand All @@ -117,6 +120,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0], topic_samples[1], topic_samples[2]],
language="en",
summary="summary4",
current_status="CURRENTLY_RATED_HELPFUL",
created_at=1152921603000,
),
note_factory.build(
Expand All @@ -125,6 +129,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="en",
summary="summary5",
current_status="CURRENTLY_RATED_HELPFUL",
created_at=1152921604000,
),
]
Expand Down Expand Up @@ -234,6 +239,7 @@ def _get_notes(
created_at_to: Union[None, TwitterTimestamp] = None,
topic_ids: Union[List[TopicId], None] = None,
post_ids: Union[List[PostId], None] = None,
current_status: Union[None, List[str]] = None,
language: Union[LanguageIdentifier, None] = None,
) -> Generator[Note, None, None]:
for note in note_samples:
Expand All @@ -247,6 +253,8 @@ def _get_notes(
continue
if post_ids is not None and note.post_id not in post_ids:
continue
if current_status is not None and note.current_status not in current_status:
continue
if language is not None and note.language != language:
continue
yield note
Expand Down
1 change: 1 addition & 0 deletions common/birdxplorer_common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ class Note(BaseModel):
language: LanguageIdentifier
topics: List[Topic]
summary: SummaryString
current_status: str | None
created_at: TwitterTimestamp


Expand Down
42 changes: 42 additions & 0 deletions common/birdxplorer_common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class NoteRecord(Base):
topics: Mapped[List[NoteTopicAssociation]] = relationship()
language: Mapped[LanguageIdentifier] = mapped_column(nullable=False)
summary: Mapped[SummaryString] = mapped_column(nullable=False)
current_status: Mapped[String] = mapped_column(nullable=True)
created_at: Mapped[TwitterTimestamp] = mapped_column(nullable=False)


Expand Down Expand Up @@ -132,6 +133,34 @@ class RowNoteRecord(Base):
row_post: Mapped["RowPostRecord"] = relationship("RowPostRecord", back_populates="row_notes")


class RowNoteStatusRecord(Base):
__tablename__ = "row_note_status"

note_id: Mapped[NoteId] = mapped_column(ForeignKey("row_notes.note_id"), primary_key=True)
note_author_participant_id: Mapped[ParticipantId] = mapped_column(nullable=False)
created_at_millis: Mapped[TwitterTimestamp] = mapped_column(nullable=False)
timestamp_millis_of_first_non_n_m_r_status: Mapped[TwitterTimestamp] = mapped_column()
first_non_n_m_r_status: Mapped[String] = mapped_column()
timestamp_millis_of_current_status: Mapped[TwitterTimestamp] = mapped_column()
current_status: Mapped[String] = mapped_column()
timestamp_millis_of_latest_non_n_m_r_status: Mapped[TwitterTimestamp] = mapped_column()
most_recent_non_n_m_r_status: Mapped[String] = mapped_column()
timestamp_millis_of_status_lock: Mapped[TwitterTimestamp] = mapped_column()
locked_status: Mapped[String] = mapped_column()
timestamp_millis_of_retro_lock: Mapped[TwitterTimestamp] = mapped_column()
current_core_status: Mapped[String] = mapped_column()
current_expansion_status: Mapped[String] = mapped_column()
current_group_status: Mapped[String] = mapped_column()
current_decided_by: Mapped[String] = mapped_column()
current_modeling_group: Mapped[int] = mapped_column()
timestamp_millis_of_most_recent_status_change: Mapped[TwitterTimestamp] = mapped_column()
timestamp_millis_of_nmr_due_to_min_stable_crh_time: Mapped[TwitterTimestamp] = mapped_column()
current_multi_group_status: Mapped[String] = mapped_column()
current_modeling_multi_group: Mapped[int] = mapped_column()
timestamp_minute_of_final_scoring_output: Mapped[TwitterTimestamp] = mapped_column()
timestamp_millis_of_first_nmr_due_to_min_stable_crh_time: Mapped[TwitterTimestamp] = mapped_column()


class RowPostRecord(Base):
__tablename__ = "row_posts"

Expand All @@ -152,6 +181,15 @@ class RowPostRecord(Base):
user: Mapped["RowUserRecord"] = relationship("RowUserRecord", back_populates="row_post")


class RowPostEmbedURLRecord(Base):
__tablename__ = "row_post_embed_urls"

post_id: Mapped[PostId] = mapped_column(ForeignKey("row_posts.post_id"), primary_key=True)
url: Mapped[String] = mapped_column(primary_key=True)
expanded_url: Mapped[String] = mapped_column(nullable=False)
unwound_url: Mapped[String] = mapped_column(nullable=False)


class RowUserRecord(Base):
__tablename__ = "row_users"

Expand Down Expand Up @@ -224,6 +262,7 @@ def get_notes(
created_at_to: Union[None, TwitterTimestamp] = None,
topic_ids: Union[List[TopicId], None] = None,
post_ids: Union[List[PostId], None] = None,
current_status: Union[None, List[str]] = None,
language: Union[LanguageIdentifier, None] = None,
) -> Generator[NoteModel, None, None]:
with Session(self.engine) as sess:
Expand All @@ -248,6 +287,8 @@ def get_notes(
query = query.filter(NoteRecord.post_id.in_(post_ids))
if language is not None:
query = query.filter(NoteRecord.language == language)
if current_status is not None:
query = query.filter(NoteRecord.current_status.in_(current_status))
for note_record in query.all():
yield NoteModel(
note_id=note_record.note_id,
Expand All @@ -265,6 +306,7 @@ def get_notes(
],
language=LanguageIdentifier.normalize(note_record.language),
summary=note_record.summary,
current_status=note_record.current_status,
created_at=note_record.created_at,
)

Expand Down
6 changes: 6 additions & 0 deletions common/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="ja",
summary="要約文1",
current_status=None,
created_at=1152921600000,
),
note_factory.build(
Expand All @@ -134,6 +135,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[],
language="en",
summary="summary2",
current_status=None,
created_at=1152921601000,
),
note_factory.build(
Expand All @@ -142,6 +144,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[1]],
language="en",
summary="summary3",
current_status=None,
created_at=1152921602000,
),
note_factory.build(
Expand All @@ -150,6 +153,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0], topic_samples[1], topic_samples[2]],
language="en",
summary="summary4",
current_status=None,
created_at=1152921603000,
),
note_factory.build(
Expand All @@ -158,6 +162,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="en",
summary="summary5",
current_status=None,
created_at=1152921604000,
),
note_factory.build(
Expand All @@ -166,6 +171,7 @@ def note_samples(note_factory: NoteFactory, topic_samples: List[Topic]) -> Gener
topics=[topic_samples[0]],
language="en",
summary="summary6_empty_post_id",
current_status=None,
created_at=1152921604000,
),
]
Expand Down
12 changes: 12 additions & 0 deletions common/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,18 @@ def test_get_notes_by_post_ids_empty(
assert expected == actual


def test_get_notes_by_note_status(
engine_for_test: Engine,
note_samples: List[Note],
note_records_sample: List[NoteRecord],
) -> None:
storage = Storage(engine=engine_for_test)
current_status = ["NEEDS_MORE_RATINGS"]
expected = [note for note in note_samples if note.current_status in current_status]
actual = list(storage.get_notes(current_status=current_status))
assert expected == actual


def test_get_notes_by_language(
engine_for_test: Engine,
note_samples: List[Note],
Expand Down
55 changes: 49 additions & 6 deletions etl/src/birdxplorer_etl/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from prefect import get_run_logger
from sqlalchemy.orm import Session
from lib.x.postlookup import lookup
from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord
from birdxplorer_common.storage import (
RowNoteRecord,
RowPostRecord,
RowUserRecord,
RowPostEmbedURLRecord,
RowNoteStatusRecord,
)
import settings


Expand All @@ -28,9 +34,11 @@ def extract_data(db: Session):
> datetime.timestamp(date) - 24 * 60 * 60 * settings.COMMUNITY_NOTE_DAYS_AGO
):
break
url = f'https://ton.twimg.com/birdwatch-public-data/{date.strftime("%Y/%m/%d")}/notes/notes-00000.tsv'
logger.info(url)
res = requests.get(url)

dateString = date.strftime("%Y/%m/%d")
note_url = f"https://ton.twimg.com/birdwatch-public-data/{dateString}/notes/notes-00000.tsv"
logger.info(note_url)
res = requests.get(note_url)

if res.status_code == 200:
# res.contentをdbのNoteテーブル
Expand All @@ -45,7 +53,26 @@ def extract_data(db: Session):
rows_to_add.append(RowNoteRecord(**row))
db.bulk_save_objects(rows_to_add)

break
status_url = f"https://ton.twimg.com/birdwatch-public-data/{dateString}/noteStatusHistory/noteStatusHistory-00000.tsv"
logger.info(status_url)
res = requests.get(status_url)

if res.status_code == 200:
# res.contentをdbのNoteStatusテーブル
tsv_data = res.content.decode("utf-8").splitlines()
reader = csv.DictReader(tsv_data, delimiter="\t")
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

rows_to_add = []
for row in reader:
status = db.query(RowNoteStatusRecord).filter(RowNoteStatusRecord.note_id == row["note_id"]).first()
if status is None or status.created_at_millis > int(datetime.now().time() * 1000):
db.query(RowNoteStatusRecord).filter(RowNoteStatusRecord.note_id == row["note_id"]).delete()
rows_to_add.append(RowNoteStatusRecord(**row))
db.bulk_save_objects(rows_to_add)

break

date = date - timedelta(days=1)

db.commit()
Expand All @@ -58,7 +85,9 @@ def extract_data(db: Session):
.filter(RowNoteRecord.created_at_millis <= settings.TARGET_TWITTER_POST_END_UNIX_MILLISECOND)
.all()
)
logger.info(len(postExtract_targetNotes))

logger.info(f"Num of Target Notes: {len(postExtract_targetNotes)}")

for note in postExtract_targetNotes:
tweet_id = note.tweet_id

Expand All @@ -71,6 +100,8 @@ def extract_data(db: Session):
logger.info(tweet_id)
post = lookup(tweet_id)

logger.info(post)

if post == None or "data" not in post:
continue

Expand Down Expand Up @@ -101,6 +132,18 @@ def extract_data(db: Session):
)
db.add(db_user)

if "entities" in post["data"] and "urls" in post["data"]["entities"]:
for url in post["data"]["entities"]["urls"]:
if "unwound_url" not in url or url["status"] != 200:
continue
db_post_embed_url = RowPostEmbedURLRecord(
post_id=post["data"]["id"],
url=url["url"],
expanded_url=url["expanded_url"],
unwound_url=url["unwound_url"],
)
db.add(db_post_embed_url)

media_data = (
post["includes"]["media"][0]
if "includes" in post and "media" in post["includes"] and len(post["includes"]["media"]) > 0
Expand Down
14 changes: 13 additions & 1 deletion etl/src/birdxplorer_etl/lib/sqlite/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker

from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord
from birdxplorer_common.storage import (
RowNoteRecord,
RowPostRecord,
RowUserRecord,
RowPostEmbedURLRecord,
RowNoteStatusRecord,
)


def init_db():
Expand All @@ -24,9 +30,15 @@ def init_db():
if not inspect(engine).has_table("row_posts"):
logger.info("Creating table post")
RowPostRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_note_status"):
logger.info("Creating table note_status")
RowNoteStatusRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_users"):
logger.info("Creating table user")
RowUserRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_post_embed_urls"):
logger.info("Creating table post_embed_urls")
RowPostEmbedURLRecord.metadata.create_all(engine)

Session = sessionmaker(bind=engine)

Expand Down
Loading

0 comments on commit b493e3f

Please sign in to comment.