diff --git a/common/birdxplorer_common/storage.py b/common/birdxplorer_common/storage.py index 6c34ed0..df77fc4 100644 --- a/common/birdxplorer_common/storage.py +++ b/common/birdxplorer_common/storage.py @@ -224,7 +224,7 @@ class RowPostRecord(Base): class RowPostMediaRecord(Base): __tablename__ = "row_post_media" - media_key: Mapped[String] = mapped_column(primary_key=True) + media_key: Mapped[String] = mapped_column(primary_key=True, unique=True) url: Mapped[String] = mapped_column(nullable=False) type: Mapped[MediaType] = mapped_column(nullable=False) diff --git a/etl/pyproject.toml b/etl/pyproject.toml index 9c31ed1..a32d049 100644 --- a/etl/pyproject.toml +++ b/etl/pyproject.toml @@ -30,7 +30,8 @@ dependencies = [ "pytest", "prefect", "stringcase", - "openai" + "openai", + "boto3", ] [project.urls] diff --git a/etl/src/birdxplorer_etl/extract.py b/etl/src/birdxplorer_etl/extract.py index 5a276a4..ee9b306 100644 --- a/etl/src/birdxplorer_etl/extract.py +++ b/etl/src/birdxplorer_etl/extract.py @@ -168,11 +168,16 @@ def extract_data(sqlite: Session, postgresql: Session): lang=post["data"]["lang"], ) postgresql.add(row_post) - postgresql.commit() + + try: + postgresql.commit() + except Exception as e: + logging.error(f"Error: {e}") + postgresql.rollback() media_recs = [ RowPostMediaRecord( - media_key=m["media_key"], + media_key=f"{m['media_key']}-{post['data']['id']}", type=m["type"], url=m.get("url") or (m["variants"][0]["url"] if "variants" in m and m["variants"] else ""), width=m["width"], @@ -186,15 +191,25 @@ def extract_data(sqlite: Session, postgresql: Session): if "entities" in post["data"] and "urls" in post["data"]["entities"]: for url in post["data"]["entities"]["urls"]: if "unwound_url" in url: - post_url = RowPostEmbedURLRecord( - post_id=post["data"]["id"], - url=url["url"] if url["url"] else None, - expanded_url=url["expanded_url"] if url["expanded_url"] else None, - unwound_url=url["unwound_url"] if url["unwound_url"] else None, + is_urlExist = ( + postgresql.query(RowPostEmbedURLRecord) + .filter(RowPostEmbedURLRecord.post_id == post["data"]["id"]) + .filter(RowPostEmbedURLRecord.url == url["url"]) + .first() ) - postgresql.add(post_url) + if is_urlExist is None: + post_url = RowPostEmbedURLRecord( + post_id=post["data"]["id"], + url=url["url"] if url["url"] else None, + expanded_url=url["expanded_url"] if url["expanded_url"] else None, + unwound_url=url["unwound_url"] if url["unwound_url"] else None, + ) + postgresql.add(post_url) note.row_post_id = tweet_id - postgresql.commit() - continue + try: + postgresql.commit() + except Exception as e: + logging.error(f"Error: {e}") + postgresql.rollback() return diff --git a/etl/src/birdxplorer_etl/lib/sqlite/init.py b/etl/src/birdxplorer_etl/lib/sqlite/init.py index 21c082a..253eb54 100644 --- a/etl/src/birdxplorer_etl/lib/sqlite/init.py +++ b/etl/src/birdxplorer_etl/lib/sqlite/init.py @@ -11,6 +11,7 @@ RowUserRecord, RowPostEmbedURLRecord, RowNoteStatusRecord, + RowPostMediaRecord, ) @@ -53,6 +54,9 @@ def init_postgresql(): if not inspect(engine).has_table("row_post_embed_urls"): logging.info("Creating table post_embed_urls") RowPostEmbedURLRecord.metadata.create_all(engine) + if not inspect(engine).has_table("row_post_media"): + logging.info("Creating table post_media") + RowPostMediaRecord.metadata.create_all(engine) Session = sessionmaker(bind=engine) diff --git a/etl/src/birdxplorer_etl/load.py b/etl/src/birdxplorer_etl/load.py index 5412d7d..7eef8f9 100644 --- a/etl/src/birdxplorer_etl/load.py +++ b/etl/src/birdxplorer_etl/load.py @@ -1,6 +1,35 @@ +import boto3 import logging +import settings +from datetime import datetime + +s3 = boto3.client("s3", region_name="ap-northeast-1") def load_data(): - logging.info("Loading data") - return + + if settings.S3_BUCKET_NAME: + bucket_name = settings.S3_BUCKET_NAME + current_time = datetime.now().strftime("%Y-%m-%d %H:%M") + objectPrefix = f"{current_time}/" + + fileNames = [ + "./data/transformed/media.csv", + "./data/transformed/note_topic_association.csv", + "./data/transformed/note.csv", + "./data/transformed/post_link_association.csv", + "./data/transformed/post_link.csv", + "./data/transformed/post_media_association.csv", + "./data/transformed/post.csv", + "./data/transformed/topic.csv", + "./data/transformed/user.csv", + ] + + for fileName in fileNames: + try: + s3.upload_file(fileName, bucket_name, f"{objectPrefix}{fileName.split('/')[-1]}") + logging.info(f"Successfully uploaded {fileName} to S3") + except FileNotFoundError: + logging.error(f"{fileName} not found") + except Exception as e: + logging.error(f"Failed to upload {fileName} to S3: {e}") diff --git a/etl/src/birdxplorer_etl/settings.py b/etl/src/birdxplorer_etl/settings.py index c775af3..370bbdd 100644 --- a/etl/src/birdxplorer_etl/settings.py +++ b/etl/src/birdxplorer_etl/settings.py @@ -19,3 +19,5 @@ TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND = os.getenv("TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND") USE_DUMMY_DATA = os.getenv("USE_DUMMY_DATA", "False") == "True" + +S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")