Skip to content

Commit

Permalink
Merge pull request #125 from codeforjapan/infra/etl-docker
Browse files Browse the repository at this point in the history
load s3
  • Loading branch information
yu23ki14 authored Oct 25, 2024
2 parents 2340664 + 041b356 commit 8d773a8
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 14 deletions.
2 changes: 1 addition & 1 deletion common/birdxplorer_common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class RowPostRecord(Base):
class RowPostMediaRecord(Base):
__tablename__ = "row_post_media"

media_key: Mapped[String] = mapped_column(primary_key=True)
media_key: Mapped[String] = mapped_column(primary_key=True, unique=True)

url: Mapped[String] = mapped_column(nullable=False)
type: Mapped[MediaType] = mapped_column(nullable=False)
Expand Down
3 changes: 2 additions & 1 deletion etl/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ dependencies = [
"pytest",
"prefect",
"stringcase",
"openai"
"openai",
"boto3",
]

[project.urls]
Expand Down
35 changes: 25 additions & 10 deletions etl/src/birdxplorer_etl/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,16 @@ def extract_data(sqlite: Session, postgresql: Session):
lang=post["data"]["lang"],
)
postgresql.add(row_post)
postgresql.commit()

try:
postgresql.commit()
except Exception as e:
logging.error(f"Error: {e}")
postgresql.rollback()

media_recs = [
RowPostMediaRecord(
media_key=m["media_key"],
media_key=f"{m['media_key']}-{post['data']['id']}",
type=m["type"],
url=m.get("url") or (m["variants"][0]["url"] if "variants" in m and m["variants"] else ""),
width=m["width"],
Expand All @@ -186,15 +191,25 @@ def extract_data(sqlite: Session, postgresql: Session):
if "entities" in post["data"] and "urls" in post["data"]["entities"]:
for url in post["data"]["entities"]["urls"]:
if "unwound_url" in url:
post_url = RowPostEmbedURLRecord(
post_id=post["data"]["id"],
url=url["url"] if url["url"] else None,
expanded_url=url["expanded_url"] if url["expanded_url"] else None,
unwound_url=url["unwound_url"] if url["unwound_url"] else None,
is_urlExist = (
postgresql.query(RowPostEmbedURLRecord)
.filter(RowPostEmbedURLRecord.post_id == post["data"]["id"])
.filter(RowPostEmbedURLRecord.url == url["url"])
.first()
)
postgresql.add(post_url)
if is_urlExist is None:
post_url = RowPostEmbedURLRecord(
post_id=post["data"]["id"],
url=url["url"] if url["url"] else None,
expanded_url=url["expanded_url"] if url["expanded_url"] else None,
unwound_url=url["unwound_url"] if url["unwound_url"] else None,
)
postgresql.add(post_url)
note.row_post_id = tweet_id
postgresql.commit()
continue
try:
postgresql.commit()
except Exception as e:
logging.error(f"Error: {e}")
postgresql.rollback()

return
4 changes: 4 additions & 0 deletions etl/src/birdxplorer_etl/lib/sqlite/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
RowUserRecord,
RowPostEmbedURLRecord,
RowNoteStatusRecord,
RowPostMediaRecord,
)


Expand Down Expand Up @@ -53,6 +54,9 @@ def init_postgresql():
if not inspect(engine).has_table("row_post_embed_urls"):
logging.info("Creating table post_embed_urls")
RowPostEmbedURLRecord.metadata.create_all(engine)
if not inspect(engine).has_table("row_post_media"):
logging.info("Creating table post_media")
RowPostMediaRecord.metadata.create_all(engine)

Session = sessionmaker(bind=engine)

Expand Down
33 changes: 31 additions & 2 deletions etl/src/birdxplorer_etl/load.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
import boto3
import logging
import settings
from datetime import datetime

s3 = boto3.client("s3", region_name="ap-northeast-1")


def load_data():
logging.info("Loading data")
return

if settings.S3_BUCKET_NAME:
bucket_name = settings.S3_BUCKET_NAME
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
objectPrefix = f"{current_time}/"

fileNames = [
"./data/transformed/media.csv",
"./data/transformed/note_topic_association.csv",
"./data/transformed/note.csv",
"./data/transformed/post_link_association.csv",
"./data/transformed/post_link.csv",
"./data/transformed/post_media_association.csv",
"./data/transformed/post.csv",
"./data/transformed/topic.csv",
"./data/transformed/user.csv",
]

for fileName in fileNames:
try:
s3.upload_file(fileName, bucket_name, f"{objectPrefix}{fileName.split('/')[-1]}")
logging.info(f"Successfully uploaded {fileName} to S3")
except FileNotFoundError:
logging.error(f"{fileName} not found")
except Exception as e:
logging.error(f"Failed to upload {fileName} to S3: {e}")
2 changes: 2 additions & 0 deletions etl/src/birdxplorer_etl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@
TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND = os.getenv("TARGET_NOTE_ESTIMATE_TOPIC_END_UNIX_MILLISECOND")

USE_DUMMY_DATA = os.getenv("USE_DUMMY_DATA", "False") == "True"

S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")

0 comments on commit 8d773a8

Please sign in to comment.