Skip to content

Commit

Permalink
add status
Browse files Browse the repository at this point in the history
  • Loading branch information
yu23ki14 committed Oct 2, 2024
1 parent df7ecb9 commit 24c6464
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 65 deletions.
40 changes: 20 additions & 20 deletions common/birdxplorer_common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,26 @@ class RowNoteStatusRecord(Base):
note_id: Mapped[NoteId] = mapped_column(ForeignKey("row_notes.note_id"), primary_key=True)
note_author_participant_id: Mapped[ParticipantId] = mapped_column(nullable=False)
created_at_millis: Mapped[TwitterTimestamp] = mapped_column(nullable=False)
timestamp_millis_of_first_non_n_m_r_status: Mapped[TwitterTimestamp] = mapped_column()
first_non_n_m_r_status: Mapped[String] = mapped_column()
timestamp_millis_of_current_status: Mapped[TwitterTimestamp] = mapped_column()
current_status: Mapped[String] = mapped_column()
timestamp_millis_of_latest_non_n_m_r_status: Mapped[TwitterTimestamp] = mapped_column()
most_recent_non_n_m_r_status: Mapped[String] = mapped_column()
timestamp_millis_of_status_lock: Mapped[TwitterTimestamp] = mapped_column()
locked_status: Mapped[String] = mapped_column()
timestamp_millis_of_retro_lock: Mapped[TwitterTimestamp] = mapped_column()
current_core_status: Mapped[String] = mapped_column()
current_expansion_status: Mapped[String] = mapped_column()
current_group_status: Mapped[String] = mapped_column()
current_decided_by: Mapped[String] = mapped_column()
current_modeling_group: Mapped[int] = mapped_column()
timestamp_millis_of_most_recent_status_change: Mapped[TwitterTimestamp] = mapped_column()
timestamp_millis_of_nmr_due_to_min_stable_crh_time: Mapped[TwitterTimestamp] = mapped_column()
current_multi_group_status: Mapped[String] = mapped_column()
current_modeling_multi_group: Mapped[int] = mapped_column()
timestamp_minute_of_final_scoring_output: Mapped[TwitterTimestamp] = mapped_column()
timestamp_millis_of_first_nmr_due_to_min_stable_crh_time: Mapped[TwitterTimestamp] = mapped_column()
timestamp_millis_of_first_non_n_m_r_status: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
first_non_n_m_r_status: Mapped[String] = mapped_column(nullable=True)
timestamp_millis_of_current_status: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
current_status: Mapped[String] = mapped_column(nullable=True)
timestamp_millis_of_latest_non_n_m_r_status: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
most_recent_non_n_m_r_status: Mapped[String] = mapped_column(nullable=True)
timestamp_millis_of_status_lock: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
locked_status: Mapped[String] = mapped_column(nullable=True)
timestamp_millis_of_retro_lock: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
current_core_status: Mapped[String] = mapped_column(nullable=True)
current_expansion_status: Mapped[String] = mapped_column(nullable=True)
current_group_status: Mapped[String] = mapped_column(nullable=True)
current_decided_by: Mapped[String] = mapped_column(nullable=True)
current_modeling_group: Mapped[int] = mapped_column(nullable=True)
timestamp_millis_of_most_recent_status_change: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
timestamp_millis_of_nmr_due_to_min_stable_crh_time: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
current_multi_group_status: Mapped[String] = mapped_column(nullable=True)
current_modeling_multi_group: Mapped[int] = mapped_column(nullable=True)
timestamp_minute_of_final_scoring_output: Mapped[TwitterTimestamp] = mapped_column(nullable=True)
timestamp_millis_of_first_nmr_due_to_min_stable_crh_time: Mapped[TwitterTimestamp] = mapped_column(nullable=True)


class RowPostRecord(Base):
Expand Down
50 changes: 20 additions & 30 deletions etl/src/birdxplorer_etl/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@
from prefect import get_run_logger
from sqlalchemy.orm import Session
from lib.x.postlookup import lookup
from birdxplorer_common.storage import (
RowNoteRecord,
RowPostRecord,
RowUserRecord,
RowPostEmbedURLRecord,
RowNoteStatusRecord,
)
from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord, RowNoteStatusRecord
import settings


Expand All @@ -36,7 +30,10 @@ def extract_data(db: Session):
break

dateString = date.strftime("%Y/%m/%d")
note_url = f"https://ton.twimg.com/birdwatch-public-data/{dateString}/notes/notes-00000.tsv"
# note_url = f"https://ton.twimg.com/birdwatch-public-data/{dateString}/notes/notes-00000.tsv"
note_url = (
"https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/notes_sample.tsv"
)
logger.info(note_url)
res = requests.get(note_url)

Expand All @@ -47,28 +44,37 @@ def extract_data(db: Session):
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

rows_to_add = []
for row in reader:
for index, row in enumerate(reader):
if db.query(RowNoteRecord).filter(RowNoteRecord.note_id == row["note_id"]).first():
continue
rows_to_add.append(RowNoteRecord(**row))
if index % 1000 == 0:
db.bulk_save_objects(rows_to_add)
rows_to_add = []
db.bulk_save_objects(rows_to_add)

status_url = f"https://ton.twimg.com/birdwatch-public-data/{dateString}/noteStatusHistory/noteStatusHistory-00000.tsv"
# status_url = f"https://ton.twimg.com/birdwatch-public-data/{dateString}/noteStatusHistory/noteStatusHistory-00000.tsv"
status_url = "https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/noteStatus_sample.tsv"
logger.info(status_url)
res = requests.get(status_url)

if res.status_code == 200:
# res.contentをdbのNoteStatusテーブル
tsv_data = res.content.decode("utf-8").splitlines()
reader = csv.DictReader(tsv_data, delimiter="\t")
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

rows_to_add = []
for row in reader:
for index, row in enumerate(reader):
for key, value in list(row.items()):
if value == "":
row[key] = None
status = db.query(RowNoteStatusRecord).filter(RowNoteStatusRecord.note_id == row["note_id"]).first()
if status is None or status.created_at_millis > int(datetime.now().time() * 1000):
if status is None or status.created_at_millis > int(datetime.now().timestamp() * 1000):
db.query(RowNoteStatusRecord).filter(RowNoteStatusRecord.note_id == row["note_id"]).delete()
rows_to_add.append(RowNoteStatusRecord(**row))
if index % 1000 == 0:
db.bulk_save_objects(rows_to_add)
rows_to_add = []
db.bulk_save_objects(rows_to_add)

break
Expand All @@ -85,9 +91,7 @@ def extract_data(db: Session):
.filter(RowNoteRecord.created_at_millis <= settings.TARGET_TWITTER_POST_END_UNIX_MILLISECOND)
.all()
)

logger.info(f"Num of Target Notes: {len(postExtract_targetNotes)}")

logger.info(len(postExtract_targetNotes))
for note in postExtract_targetNotes:
tweet_id = note.tweet_id

Expand All @@ -100,8 +104,6 @@ def extract_data(db: Session):
logger.info(tweet_id)
post = lookup(tweet_id)

logger.info(post)

if post == None or "data" not in post:
continue

Expand Down Expand Up @@ -132,18 +134,6 @@ def extract_data(db: Session):
)
db.add(db_user)

if "entities" in post["data"] and "urls" in post["data"]["entities"]:
for url in post["data"]["entities"]["urls"]:
if "unwound_url" not in url or url["status"] != 200:
continue
db_post_embed_url = RowPostEmbedURLRecord(
post_id=post["data"]["id"],
url=url["url"],
expanded_url=url["expanded_url"],
unwound_url=url["unwound_url"],
)
db.add(db_post_embed_url)

media_data = (
post["includes"]["media"][0]
if "includes" in post and "media" in post["includes"] and len(post["includes"]["media"]) > 0
Expand Down
42 changes: 27 additions & 15 deletions etl/src/birdxplorer_etl/transform.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from sqlalchemy import select, func, and_, Integer
from sqlalchemy.orm import Session
from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord
from birdxplorer_common.storage import RowNoteRecord, RowPostRecord, RowUserRecord, RowNoteStatusRecord
from birdxplorer_etl.lib.ai_model.ai_model_interface import get_ai_service
from birdxplorer_etl.settings import (
TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND,
Expand All @@ -23,7 +23,7 @@ def transform_data(db: Session):
os.remove("./data/transformed/note.csv")
with open("./data/transformed/note.csv", "a") as file:
writer = csv.writer(file)
writer.writerow(["note_id", "post_id", "summary", "created_at", "language"])
writer.writerow(["note_id", "post_id", "summary", "current_status", "created_at", "language"])

offset = 0
limit = 1000
Expand All @@ -49,14 +49,10 @@ def transform_data(db: Session):
RowNoteRecord.note_id,
RowNoteRecord.row_post_id,
RowNoteRecord.summary,
RowNoteStatusRecord.current_status,
func.cast(RowNoteRecord.created_at_millis, Integer).label("created_at"),
)
.filter(
and_(
RowNoteRecord.created_at_millis <= TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND,
RowNoteRecord.created_at_millis >= TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND,
)
)
.join(RowNoteStatusRecord, RowNoteRecord.note_id == RowNoteStatusRecord.note_id)
.limit(limit)
.offset(offset)
)
Expand Down Expand Up @@ -170,7 +166,7 @@ def transform_data(db: Session):
return


def generate_note_topic():
def generate_note_topic(db: Session):
note_csv_file_path = "./data/transformed/note.csv"
output_csv_file_path = "./data/transformed/note_topic_association.csv"
ai_service = get_ai_service()
Expand All @@ -181,17 +177,33 @@ def generate_note_topic():
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()

with open(note_csv_file_path, newline="", encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for index, row in enumerate(reader):
note_id = row["note_id"]
summary = row["summary"]
offset = 0
limit = 1000

num_of_users = db.query(func.count(RowUserRecord.user_id)).scalar()

while offset < num_of_users:
topicEstimationTargetNotes = db.execute(
select(RowNoteRecord.note_id, RowNoteRecord.row_post_id, RowNoteRecord.summary)
.filter(
and_(
RowNoteRecord.created_at_millis <= TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND,
RowNoteRecord.created_at_millis >= TARGET_NOTE_ESTIMATE_TOPIC_START_UNIX_MILLISECOND,
)
)
.join(RowNoteStatusRecord, RowNoteRecord.note_id == RowNoteStatusRecord.note_id)
.limit(limit)
.offset(offset)
)

for index, note in enumerate(topicEstimationTargetNotes):
note_id = note.note_id
summary = note.summary
topics_info = ai_service.detect_topic(note_id, summary)
if topics_info:
for topic in topics_info.get("topics", []):
record = {"note_id": note_id, "topic_id": topic}
records.append(record)

if index % 100 == 0:
for record in records:
writer.writerow(
Expand Down

0 comments on commit 24c6464

Please sign in to comment.