diff --git a/common/birdxplorer_common/storage.py b/common/birdxplorer_common/storage.py index 0b821e2..524c168 100644 --- a/common/birdxplorer_common/storage.py +++ b/common/birdxplorer_common/storage.py @@ -52,6 +52,7 @@ class Base(DeclarativeBase): HttpUrl: String, NonNegativeInt: DECIMAL, MediaDetails: JSON, + BinaryBool: String } diff --git a/etl/pyproject.toml b/etl/pyproject.toml index b6a97af..e430be7 100644 --- a/etl/pyproject.toml +++ b/etl/pyproject.toml @@ -11,7 +11,7 @@ authors = [ {name = "yu23ki14"} ] dependencies = [ - "birdxplorer_common @ git+https://github.com/codeforjapan/BirdXplorer.git@feature/issue-53-divide-python-packages#subdirectory=common", + "birdxplorer_common[dev] @ git+https://github.com/codeforjapan/BirdXplorer.git@etl/main#subdirectory=common", "pandas", "sqlalchemy", "requests", diff --git a/etl/src/extract.py b/etl/src/extract.py index 0c6ef1e..94bccb8 100644 --- a/etl/src/extract.py +++ b/etl/src/extract.py @@ -1,26 +1,27 @@ -import birdxplorer_common.models +from birdxplorer_common.storage import RowNoteRecord from prefect import get_run_logger import requests from datetime import datetime, timedelta import csv -import birdxplorer_common -from typing import List import stringcase +from sqlalchemy.orm import Session import settings -from lib.sqlite.init import init_db -def extract_data(): +def extract_data(db: Session): logger = get_run_logger() logger.info("Downloading community notes data") - db = init_db() - - # 最新のNoteデータを取得 + # Noteデータを取得してSQLiteに保存 date = datetime.now() + latest_note = db.query(RowNoteRecord).order_by(RowNoteRecord.created_at_millis.desc()).first() + while True: + if latest_note and int(latest_note.created_at_millis) / 1000 > datetime.timestamp(date) - 24 * 60 * 60 * settings.COMMUNITY_NOTE_DAYS_AGO: + break url = f'https://ton.twimg.com/birdwatch-public-data/{date.strftime("%Y/%m/%d")}/notes/notes-00000.tsv' logger.info(url) res = requests.get(url) + if res.status_code == 200: # res.contentをdbのNoteテーブル tsv_data = res.content.decode('utf-8').splitlines() @@ -28,13 +29,14 @@ def extract_data(): reader.fieldnames = [stringcase.snakecase(field) for field in reader.fieldnames] for row in reader: - db.add(row) + db.add(RowNoteRecord(**row)) break date = date - timedelta(days=1) db.commit() - db.query(birdxplorer_common.models.Note).first() + row1 = db.query(RowNoteRecord).first() + logger.info(row1) # # Noteに紐づくtweetデータを取得 # for note in notes_data: diff --git a/etl/src/lib/sqlite/init.py b/etl/src/lib/sqlite/init.py index 599f351..b5d72fe 100644 --- a/etl/src/lib/sqlite/init.py +++ b/etl/src/lib/sqlite/init.py @@ -3,15 +3,23 @@ from sqlalchemy.orm import sessionmaker import os from prefect import get_run_logger -from birdxplorer_common.storage import Row +from birdxplorer_common.storage import RowNoteRecord +from sqlalchemy import inspect def init_db(): logger = get_run_logger() + # ToDo: dbファイルをS3など外部に置く必要がある。 db_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'data', 'note.db')) logger.info(f'Initializing database at {db_path}') engine = create_engine('sqlite:///' + db_path) - Base.metadata.create_all(engine) + + # 一時データベースのテーブル作成する + # ToDo: noteテーブル以外に必要なものを追加 + if not inspect(engine).has_table('note'): + logger.info('Creating table note') + RowNoteRecord.metadata.create_all(engine) + Session = sessionmaker(bind=engine) return Session() diff --git a/etl/src/main.py b/etl/src/main.py index 16ff236..aa996a4 100644 --- a/etl/src/main.py +++ b/etl/src/main.py @@ -2,10 +2,17 @@ from extract import extract_data from transform import transform_data from load import load_data +from lib.sqlite.init import init_db +from sqlalchemy.orm import Session @task -def extract(): - extract_data() +def initialize(): + db = init_db() + return {'db': db} + +@task +def extract(db: Session): + extract_data(db) @task def transform(): @@ -17,7 +24,8 @@ def load(): @flow def run_etl(): - e = extract() + i = initialize() + e = extract(i['db']) t = transform() l = load() diff --git a/etl/src/settings.py b/etl/src/settings.py index 6844a24..de82397 100644 --- a/etl/src/settings.py +++ b/etl/src/settings.py @@ -1,2 +1,5 @@ TARGET_TWITTER_POST_START_UNIX_MILLISECOND = 1577836800000 -TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1577836799000 \ No newline at end of file +TARGET_TWITTER_POST_END_UNIX_MILLISECOND = 1577836799000 + +# Extractで何日前のデータを最新と定義するか。開発中は3日前が楽。 +COMMUNITY_NOTE_DAYS_AGO=3 \ No newline at end of file