Skip to content

Commit

Permalink
Get Row note data
Browse files Browse the repository at this point in the history
  • Loading branch information
yu23ki14 committed May 23, 2024
1 parent 0a27638 commit a16f5bd
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 17 deletions.
1 change: 1 addition & 0 deletions common/birdxplorer_common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Base(DeclarativeBase):
HttpUrl: String,
NonNegativeInt: DECIMAL,
MediaDetails: JSON,
BinaryBool: String
}


Expand Down
2 changes: 1 addition & 1 deletion etl/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ authors = [
{name = "yu23ki14"}
]
dependencies = [
"birdxplorer_common @ git+https://github.com/codeforjapan/BirdXplorer.git@feature/issue-53-divide-python-packages#subdirectory=common",
"birdxplorer_common[dev] @ git+https://github.com/codeforjapan/BirdXplorer.git@etl/main#subdirectory=common",
"pandas",
"sqlalchemy",
"requests",
Expand Down
22 changes: 12 additions & 10 deletions etl/src/extract.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,42 @@
import birdxplorer_common.models
from birdxplorer_common.storage import RowNoteRecord
from prefect import get_run_logger
import requests
from datetime import datetime, timedelta
import csv
import birdxplorer_common
from typing import List
import stringcase
from sqlalchemy.orm import Session
import settings
from lib.sqlite.init import init_db

def extract_data():
def extract_data(db: Session):
logger = get_run_logger()
logger.info("Downloading community notes data")

db = init_db()

# 最新のNoteデータを取得
# Noteデータを取得してSQLiteに保存
date = datetime.now()
latest_note = db.query(RowNoteRecord).order_by(RowNoteRecord.created_at_millis.desc()).first()

while True:
if latest_note and int(latest_note.created_at_millis) / 1000 > datetime.timestamp(date) - 24 * 60 * 60 * settings.COMMUNITY_NOTE_DAYS_AGO:
break
url = f'https://ton.twimg.com/birdwatch-public-data/{date.strftime("%Y/%m/%d")}/notes/notes-00000.tsv'
logger.info(url)
res = requests.get(url)

if res.status_code == 200:
# res.contentをdbのNoteテーブル
tsv_data = res.content.decode('utf-8').splitlines()
reader = csv.DictReader(tsv_data, delimiter='\t')
reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames]

for row in reader:
db.add(row)
db.add(RowNoteRecord(**row))
break
date = date - timedelta(days=1)

db.commit()

db.query(birdxplorer_common.models.Note).first()
row1 = db.query(RowNoteRecord).first()
logger.info(row1)

# # Noteに紐づくtweetデータを取得
# for note in notes_data:
Expand Down
12 changes: 10 additions & 2 deletions etl/src/lib/sqlite/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@
from sqlalchemy.orm import sessionmaker
import os
from prefect import get_run_logger
from birdxplorer_common.storage import Row
from birdxplorer_common.storage import RowNoteRecord
from sqlalchemy import inspect

def init_db():
logger = get_run_logger()

# ToDo: dbファイルをS3など外部に置く必要がある。
db_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data', 'note.db'))
logger.info(f'Initializing database at {db_path}')
engine = create_engine('sqlite:///' + db_path)
Base.metadata.create_all(engine)

# 一時データベースのテーブル作成する
# ToDo: noteテーブル以外に必要なものを追加
if not inspect(engine).has_table('note'):
logger.info('Creating table note')
RowNoteRecord.metadata.create_all(engine)

Session = sessionmaker(bind=engine)

return Session()
14 changes: 11 additions & 3 deletions etl/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@
from extract import extract_data
from transform import transform_data
from load import load_data
from lib.sqlite.init import init_db
from sqlalchemy.orm import Session

@task
def extract():
extract_data()
def initialize():
db = init_db()
return {'db': db}

@task
def extract(db: Session):
extract_data(db)

@task
def transform():
Expand All @@ -17,7 +24,8 @@ def load():

@flow
def run_etl():
e = extract()
i = initialize()
e = extract(i['db'])
t = transform()
l = load()

Expand Down
5 changes: 4 additions & 1 deletion etl/src/settings.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
TARGET_TWITTER_POST_START_UNIX_MILLISECOND = 1577836800000
TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1577836799000
TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1577836799000

# Extractで何日前のデータを最新と定義するか。開発中は3日前が楽。
COMMUNITY_NOTE_DAYS_AGO=3

0 comments on commit a16f5bd

Please sign in to comment.