diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b175150..50f5f48 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -53,4 +53,6 @@ jobs: DB_PASSWORD: ${{ secrets.DB_PASSWORD }} DB_HOST_URL: ${{ secrets.DB_HOST_URL }} DB_PORT: ${{ secrets.DB_PORT }} + SUPABASE_WATCHDB_URL: ${{ secrets.SUPABASE_WATCHDB_URL }} + SUPABASE_WATCHDB_SERVICE_ROLE_KEY: ${{ secrets.SUPABASE_WATCHDB_SERVICE_ROLE_KEY }} run: python src/data_collection/watchdb_populator.py --num_requests 25 diff --git a/src/data_collection/crud.py b/src/data_collection/crud.py deleted file mode 100644 index 3f74ee9..0000000 --- a/src/data_collection/crud.py +++ /dev/null @@ -1,58 +0,0 @@ -from datetime import datetime -from decimal import Decimal -from typing import List, Optional - -from models import QueuedPost, Watch -from sqlalchemy import select, update -from sqlalchemy.dialects.postgresql import UUID, insert -from sqlalchemy.exc import IntegrityError -from sqlalchemy.sql import and_, or_ - - -def create_queued_post(session, post: QueuedPost): - try: - session.add(post) - session.commit() - session.refresh(post) - return post - except IntegrityError: - session.rollback() - return None - - -def get_queued_posts(session, limit: int): - return ( - session.query(QueuedPost) - .filter(QueuedPost.processed == False) - .limit(limit) - .all() - ) - - -def mark_post_as_processed(session, post_id: str): - post = session.query(QueuedPost).filter(QueuedPost.post_id == post_id).first() - if post: - post.processed = True - session.commit() - return True - return False - - -def delete_processed_posts(session): - session.query(QueuedPost).filter(QueuedPost.processed == True).delete() - session.commit() - - -def create_watch(session, watch: Watch): - session.add(watch) - session.commit() - session.refresh(watch) - return watch - - -def get_watches_by_brand_model(session, brand: str, ref: str): - return ( - session.query(Watch) - .filter(and_(Watch.Brand == brand, Watch.Reference_Number == ref)) - .all() - ) diff --git a/src/data_collection/database.py b/src/data_collection/database.py deleted file mode 100644 index c449401..0000000 --- a/src/data_collection/database.py +++ /dev/null @@ -1,29 +0,0 @@ -import os - -from dotenv import load_dotenv -from models import Base -from sqlalchemy import Column, Integer, String, create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker - -load_dotenv() - -DATABASE_USERNAME = os.getenv("DB_USER") -DATABASE_PASSWORD = os.getenv("DB_PASSWORD") -DATABASE_HOST_URL = os.getenv("DB_HOST_URL") -DATABASE_PORT = os.getenv("DB_PORT") -sql_url = f"postgresql://{DATABASE_USERNAME}:{DATABASE_PASSWORD}@{DATABASE_HOST_URL}:{DATABASE_PORT}/postgres" - -print("THIS IS THE URL: ", sql_url) - -engine = create_engine(sql_url, echo=False, future=True, pool_pre_ping=True) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) - - -def init_db(): - with engine.connect() as conn: - with conn.begin(): - Base.metadata.create_all(engine) - - -init_db() diff --git a/src/data_collection/lambda_function.py b/src/data_collection/lambda_function.py deleted file mode 100644 index e710c7c..0000000 --- a/src/data_collection/lambda_function.py +++ /dev/null @@ -1,169 +0,0 @@ -import argparse -import json -import logging -import os -from datetime import datetime - -import praw -from crud import ( - create_queued_post, - create_watch, - get_queued_posts, - mark_post_as_processed, -) -from database import SessionLocal -from dotenv import load_dotenv -from jsonschema.exceptions import ValidationError -from models import QueuedPost, Watch -from openai import OpenAI -from postgrest.exceptions import APIError -from schema_validator import validate_schema - -logging.basicConfig(filename="app.log", level=logging.INFO) -load_dotenv() - - -def reddit_crawler(time_filter, post_limit, comments_limit): - # Reddit API Credentials - client_id = os.environ.get("REDDIT_APP_ID") - client_secret = os.environ.get("REDDIT_APP_KEY") - - logging.info(f"Reddit client_id: {client_id}") - logging.info(f"Reddit client_secret: {client_secret}") - - # Initialize PRAW with credentials - user_agent = "User-Agent:chrono-codex-server:v1 (by /u/ChronoCrawler)" - reddit = praw.Reddit( - client_id=client_id, client_secret=client_secret, user_agent=user_agent - ) - logging.info("PRAW Reddit client initialized successfully.") - - subreddit = reddit.subreddit("watchexchange") - logging.info(f"Subreddit set to: {subreddit.display_name}") - - # Fetch the top posts from the subreddit - top_posts = subreddit.top(time_filter=time_filter, limit=post_limit) - logging.info( - f"Fetched top posts with time_filter={time_filter} and post_limit={post_limit}" - ) - - # Push the data collected to Supabase - for post in top_posts: - post.comments.replace_more(limit=comments_limit) # Load all comments - comments = " | ".join( - [ - f"{comment.author.name}: {comment.body}" - for comment in post.comments.list() - if comment.author and comment.author.name and comment.body - ] - ) - logging.debug(f"Collected comments for post ID: {post.id}") - - post_data = QueuedPost( - post_id=post.id, - created_at=datetime.utcfromtimestamp(post.created_utc).strftime( - "%Y-%m-%d %H:%M:%S" - ), - author_id=post.author.name, - title=post.title, - url=post.url, - comments=comments, - ) - - try: - with SessionLocal() as session: - create_queued_post(session, post_data) - logging.info(f"Data inserted successfully for post ID: {post.id}") - except APIError as api_error: - logging.error(f"API Error: {api_error}") - if api_error.code == "23505": - logging.warning(f"Duplicate entry ({post.id}), skipping") - else: - raise api_error - - -def process_queue(num_requests): - openai_key = os.environ.get("OPENAI_API_CHRONO_KEY") - client = OpenAI(api_key=openai_key) - - with open("query_schema.json") as f: - output_schema_str = f.read() - - try: - with SessionLocal() as session: - queue_data = get_queued_posts(session, num_requests) - - except Exception as e: - logging.error(f"Failed to fetch data from Supabase (rqueue): {str(e)}") - return - - for item in queue_data: - try: - relevant_data = item.get_relevant_data() - prompt = f"Given the data: {relevant_data}, construct a JSON object that adheres to the specified output schema. Output schema: {output_schema_str}" - response = client.chat.completions.create( - model="gpt-3.5-turbo-0125", - response_format={"type": "json_object"}, - messages=[ - { - "role": "system", - "content": "You are a helpful assistant that outputs valid JSON.>", - }, - {"role": "user", "content": prompt}, - ], - ) - response_json = json.loads(response.choices[0].message.content) - logging.info(f"Response JSON: {response_json}") - validated_response = validate_schema(response_json) - watch = Watch( - Brand=validated_response.get("Brand", "Unknown"), - Reference_Number=validated_response.get("Reference Number", "None"), - Timestamp=item.created_at, - Model=validated_response.get("Model", None), - Case_Material=validated_response.get("Case Material", None), - Case_Diameter=validated_response.get("Case Diameter", None), - Case_Thickness=validated_response.get("Case Thickness", None), - Lug_Width=validated_response.get("Lug Width", None), - Lug_to_Lug=validated_response.get("Lug-to-Lug", None), - Dial_Color=validated_response.get("Dial Color", None), - Crystal_Type=validated_response.get("Crystal Type", None), - Water_Resistance=validated_response.get("Water Resistance", None), - Movement=validated_response.get("Movement", None), - Caliber=validated_response.get("Caliber", None), - Movement_Type=validated_response.get("Movement Type", None), - Power_Reserve=validated_response.get("Power Reserve", None), - Bracelet_Strap_Material=validated_response.get( - "Bracelet/Strap Material", None - ), - Clasp_Type=validated_response.get("Clasp Type", None), - Product_Weight=validated_response.get("Product Weight", None), - Features=validated_response.get("Features", None), - Price=validated_response.get("Price", None), - Availability=validated_response.get("Availability", None), - Photo_URL=validated_response.get("Photo URL", None), - Merchant_Name=validated_response.get("Merchant Name", None), - Product_URL=validated_response.get("Product URL", None), - ) - with SessionLocal() as session: - create_watch(session, watch) - mark_post_as_processed(session, item.post_id) - except json.JSONDecodeError as json_err: - logging.error( - f"Error in parsing the JSON outputted by OpenAI:\n\t {json_err}" - ) - except ValidationError as e: - logging.error( - f"Schema Validation failed, likely missing some data:\n\tjson:{response_json}\n\terr:{e}" - ) - except Exception as e: - logging.error(f"Unknown Exception: {e}") - raise - - -def lambda_handler(event, context): - reddit_crawler("hour", 25, 10) - process_queue(30) - return {"statusCode": 200, "body": json.dumps("Hello from Lambda!")} - - -lambda_handler(None, None) diff --git a/src/data_collection/models.py b/src/data_collection/models.py deleted file mode 100644 index cb80ec8..0000000 --- a/src/data_collection/models.py +++ /dev/null @@ -1,61 +0,0 @@ -import datetime -import json - -from dotenv import load_dotenv -from sqlalchemy import Boolean, Column, DateTime, Float, Integer, String, create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker - -# Base class for all models -Base = declarative_base() - - -class QueuedPost(Base): - __tablename__ = "queued_posts" - post_id = Column(String, primary_key=True, nullable=False) - created_at = Column(DateTime, default=datetime.datetime.utcnow) - author_id = Column(String, nullable=False) - title = Column(String, nullable=False) - url = Column(String, nullable=False) - comments = Column(String, nullable=False) - processed = Column(Boolean, default=False) - - def get_relevant_data(self): - return json.dumps( - { - "author_id": self.author_id, - "title": self.title, - "comments": self.comments, - "url": self.url, - } - ) - - -class Watch(Base): - __tablename__ = "watches" - Brand = Column(String, primary_key=True) - Reference_Number = Column(String, primary_key=True) - Timestamp = Column(DateTime, primary_key=True) - Model = Column(String, primary_key=True, nullable=False) - - Case_Material = Column(String, nullable=True) - Case_Diameter = Column(Float, nullable=True) - Case_Thickness = Column(Float, nullable=True) - Lug_Width = Column(Float, nullable=True) - Lug_to_Lug = Column(Float, nullable=True) - Dial_Color = Column(String, nullable=True) - Crystal_Type = Column(String, nullable=True) - Water_Resistance = Column(String, nullable=True) - Movement = Column(String, nullable=True) - Caliber = Column(String, nullable=True) - Movement_Type = Column(String, nullable=True) - Power_Reserve = Column(String, nullable=True) - Bracelet_Strap_Material = Column(String, nullable=True) - Clasp_Type = Column(String, nullable=True) - Product_Weight = Column(String, nullable=True) - Features = Column(String, nullable=True) - Price = Column(Float, nullable=True) - Availability = Column(String, nullable=True) - Photo_URL = Column(String, nullable=True) - Merchant_Name = Column(String, nullable=True) - Product_URL = Column(String, nullable=True) diff --git a/src/data_collection/query_schema.json b/src/data_collection/query_schema.json index 0501ea4..59d76d3 100644 --- a/src/data_collection/query_schema.json +++ b/src/data_collection/query_schema.json @@ -1,28 +1,26 @@ { - "Brand": "string", - "Model": "string", - "Reference Number": "string", - "Case Material": "string", - "Case Diameter": "number", - "Case Thickness": "number", - "Lug Width": "number", - "Lug-to-Lug": "number", - "Dial Color": "string", - "Crystal Type": "string", - "Water Resistance": "string", + "Brand": {"type": "string", "required": true}, + "Model": {"type": "string", "required": true}, + "Reference_Number": {"type": "string", "required": true}, + "Case_Material": "string", + "Case_Diameter": "number", + "Case_Thickness": "number", + "Lug_Width": "number", + "Lug_to_Lug": "number", + "Dial_Color": "string", + "Crystal_Type": "string", + "Water_Resistance": "string", "Movement": "string", "Caliber": "string", - "Movement Type": "string", - "Power Reserve": "string", - "Bracelet/Strap Material": "string", - "Clasp Type": "string", - "Product Weight": "string", + "Movement_Type": "string", + "Power_Reserve": "string", + "Bracelet_Strap_Material": "string", + "Clasp_Type": "string", + "Product_Weight": "string", "Features": "string", "Price": "number", "Availability": "string", - "Photo URL": "string", - "Merchant Name": "string", - "Product URL": "string" + "Photo_URL": "string", + "Merchant_Name": "string", + "Product_URL": "string" } - - \ No newline at end of file diff --git a/src/data_collection/reddit_crawler.ipynb b/src/data_collection/reddit_crawler.ipynb deleted file mode 100644 index 0c0d28f..0000000 --- a/src/data_collection/reddit_crawler.ipynb +++ /dev/null @@ -1,326 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Lad wrote a Python script to download Alexa voice recordings, he didn't expect this email.\n", - "This post has:\n", - "I redesign the Python logo to make it more modern\n", - "Automate the boring stuff with python - tinder\n", - "Just finished programming and building my own smart mirror in python, wrote all of the code myself and implemented my own voice control and facial recognition features\n" - ] - } - ], - "source": [ - "import praw\n", - "import os\n", - "import json\n", - "import time\n", - "\n", - "from supabase import create_client, Client\n", - "\n", - "# Supabase setup\n", - "url: str = os.environ.get('SUPABASE_WATCHDB_URL')\n", - "key: str = os.environ.get('SUPABASE_WATCHDB_SERVICE_ROLE_KEY')\n", - "supabase: Client = create_client(url, key)\n", - "\n", - "# Reddit API Credentials\n", - "client_id = os.environ.get('REDDIT_APP_ID')\n", - "client_secret = os.environ.get('REDDIT_APP_KEY')\n", - "user_agent = 'User-Agent:chrono-codex-server:v1 (by /u/ChronoCrawler)'\n", - "\n", - "# Initialize PRAW with your credentials\n", - "reddit = praw.Reddit(client_id=client_id,\n", - " client_secret=client_secret,\n", - " user_agent=user_agent)\n" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2024-02-06 17:53:19,214:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 409 Conflict\"\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Skipping insertion for post_id=1akq7lk as it already exists.\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2024-02-06 17:53:19,456:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n", - "2024-02-06 17:53:19,743:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n", - "2024-02-06 17:53:20,017:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n", - "2024-02-06 17:53:20,316:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n", - "2024-02-06 17:53:20,715:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n", - "2024-02-06 17:53:20,948:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n", - "2024-02-06 17:53:21,310:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n", - "2024-02-06 17:53:21,584:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n", - "2024-02-06 17:53:21,863:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue \"HTTP/1.1 201 Created\"\n" - ] - } - ], - "source": [ - "# The subreddit you want to scrape\n", - "subreddit = reddit.subreddit('watchexchange')\n", - "\n", - "# Initialize a list to store data before saving to disk\n", - "data_list = []\n", - "\n", - "# Fetch the top posts from the subreddit\n", - "top_posts = subreddit.top(time_filter=\"hour\", limit=10) # Adjust limit as needed\n", - "\n", - "\n", - "# Push the data collected to Supabase\n", - "for post in top_posts:\n", - " post.comments.replace_more(limit=25) # Load all comments\n", - " # comments = [{'userid': comment.author.name, 'comment': comment.body} for comment in post.comments.list()]\n", - " comments = ' | '.join([f\"{comment.author.name}: {comment.body}\" for comment in post.comments.list()])\n", - "\n", - " post_data = {\n", - " 'post_id': post.id,\n", - " 'author_id': post.author.name,\n", - " 'title': post.title,\n", - " 'url': post.url,\n", - " 'comments': comments\n", - " }\n", - " \n", - " try:\n", - " # Attempt to insert post_data into your Supabase table\n", - " data_insert_response = supabase.table('rqueue').insert(post_data).execute()\n", - " except Exception as e:\n", - " if 'duplicate key value violates unique constraint \"rqueue_pkey\"' in str(e):\n", - " print(f\"Skipping insertion for post_id={post_data['post_id']} as it already exists.\")\n", - " else:\n", - " raise\n" - ] - }, - { - "cell_type": "code", - "execution_count": 42, - "metadata": {}, - "outputs": [], - "source": [ - "from jsonschema import validate\n", - "from jsonschema.exceptions import ValidationError\n", - "\n", - "\n", - "def filter_invalid_entries(data, schema):\n", - " \"\"\"\n", - " Removes entries from the data that do not match the schema's type requirements.\n", - " \n", - " :param data: The JSON object to filter.\n", - " :param schema: The schema defining the expected types.\n", - " :return: A new dictionary with only the valid entries according to the schema.\n", - " \"\"\"\n", - " valid_data = {}\n", - " for key, value in data.items():\n", - " expected_type = schema.get(\"properties\", {}).get(key, {}).get(\"type\", None)\n", - " if expected_type:\n", - " if expected_type == \"number\" and value is not None:\n", - " print(key,value)\n", - " try:\n", - " converted_value = int(value)\n", - " except ValueError:\n", - " try:\n", - " converted_value = float(value)\n", - " except ValueError:\n", - " continue\n", - " valid_data[key] = converted_value\n", - " elif expected_type == \"string\" and isinstance(value, str):\n", - " valid_data[key] = value\n", - " return valid_data\n", - "\n", - "\n", - "\n", - "def validate_schema(data: dict):\n", - " # This code validates the schema of the provided JSON data\n", - " # and drops any entries that do not match the schema, effectively\n", - " # filtering out incorrect data.\n", - "\n", - " # Load the schema\n", - " with open('watch_schema.json', 'r') as file:\n", - " schema = json.load(file)\n", - "\n", - " json_data = filter_invalid_entries(data, schema)\n", - "\n", - " # Validate the JSON, on failure throw exception\n", - " validate(instance=json_data, schema=schema)\n", - " return json_data \n", - " \n" - ] - }, - { - "cell_type": "code", - "execution_count": 44, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "{'Brand': 'Breitling', 'Model': 'SuperOcean II 36', 'Reference Number': 'A17312', 'Case Material': 'Not specified', 'Case Diameter': None, 'Case Thickness': None, 'Lug Width': None, 'Lug-to-Lug': None, 'Dial Color': 'Not specified', 'Crystal Type': 'Not specified', 'Water Resistance': 'Not specified', 'Movement': 'Not specified', 'Caliber': 'Not specified', 'Movement Type': 'Not specified', 'Power Reserve': 'Not specified', 'Bracelet/Strap Material': 'Not specified', 'Clasp Type': 'Not specified', 'Product Weight': 'Not specified', 'Features': 'Completely full kit with watch roll. 2016 year.', 'Price': 1699, 'Availability': 'Zelle, Venmo, or WU accepted', 'Photo URL': 'https://imgur.com/a/Quq7MB9', 'Merchant Name': 'Pristine_Courage_535', 'Product URL': 'https://www.reddit.com/r/Watchexchange/comments/1akq7lk/wts_breitling_superocean_a17312_white_full_kit/'}\n", - "Price 1699\n" - ] - }, - { - "data": { - "text/plain": [ - "{'Brand': 'Breitling',\n", - " 'Model': 'SuperOcean II 36',\n", - " 'Reference Number': 'A17312',\n", - " 'Case Material': 'Not specified',\n", - " 'Dial Color': 'Not specified',\n", - " 'Crystal Type': 'Not specified',\n", - " 'Water Resistance': 'Not specified',\n", - " 'Movement': 'Not specified',\n", - " 'Caliber': 'Not specified',\n", - " 'Movement Type': 'Not specified',\n", - " 'Power Reserve': 'Not specified',\n", - " 'Bracelet/Strap Material': 'Not specified',\n", - " 'Clasp Type': 'Not specified',\n", - " 'Product Weight': 'Not specified',\n", - " 'Features': 'Completely full kit with watch roll. 2016 year.',\n", - " 'Price': 1699,\n", - " 'Availability': 'Zelle, Venmo, or WU accepted',\n", - " 'Photo URL': 'https://imgur.com/a/Quq7MB9',\n", - " 'Merchant Name': 'Pristine_Courage_535',\n", - " 'Product URL': 'https://www.reddit.com/r/Watchexchange/comments/1akq7lk/wts_breitling_superocean_a17312_white_full_kit/'}" - ] - }, - "execution_count": 44, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "response_json = json.loads(response.choices[0].message.content)\n", - "print(response_json)\n", - "validated_response = validate_schema(response_json)\n", - "validated_response" - ] - }, - { - "cell_type": "code", - "execution_count": 48, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2024-02-07 11:37:51,075:INFO - HTTP Request: GET https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue?select=%2A&processed=eq.False&limit=1 \"HTTP/1.1 200 OK\"\n", - "2024-02-07 11:37:51,152:INFO - HTTP Request: PATCH https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/rqueue?post_id=eq.1akqglv \"HTTP/1.1 200 OK\"\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "data=[{'created_at': '2024-02-07T01:53:19.464226+00:00', 'post_id': '1akqglv', 'author_id': 'liuserr', 'title': '[WTS] Sinn 356 on OEM Leather strap', 'url': 'https://i.redd.it/18ictee7d2hc1.jpeg', 'comments': 'AutoModerator: * u/liuserr has 149 Transactions and [this 7-day post history](https://www.reddit.com/r/Watchexchange/wiki/user_post_history/liuserr).\\n* [Click send on this message](https://www.reddit.com/message/compose/?to=WatchExBot&subject=Feedback_Check&message=u/liuserr) to get a detailed look at this user\\'s transaction history. \\n * If the above link doesn\\'t work, send a message containing the username (starting with u/) to the bot, `WatchExBot`\\n* This post: \"[[WTS] Sinn 356 on OEM Leather strap](https://www.reddit.com/r/Watchexchange/comments/1akqglv/wts_sinn_356_on_oem_leather_strap/)\" \\n\\n---\\n\\n**This post may be removed without notice if it does not follow the rules!**\\n\\n* **Pictures** are required. Do you have pictures? \\n* A **timestamp** is required. Do you have a ***handwritten timestamp*** under your item(s)? \\n* A **price** or trade value is required. Do you have a price? \\n \\nIf you make a mistake in your post, ***do not delete and repost.*** Message the mods *first* or your post may be removed and you\\'ll have to wait 7 days to repost.\\n\\n---\\n\\n[Click here](https://www.reddit.com/r/Watchexchange/comments/ihs99w/meta_new_flairbased_feedback/) to learn how to leave feedback that updates your transaction flair.\\n\\n**Avoid banned or unqualified users; do not deal with anyone unless they comment on this thread.** \\n \\nIf you believe you have been scammed, please [fill out this form](https://www.reddit.com/message/compose/?to=r/Watchexchange&subject=I%20Think%20I%20Have%20Been%20Scammed&message=*%20Username%20of%20the%20person%20scamming%20me:%20%0D%0A%0D%0A*%20Link%20to%20the%20thread%20where%20the%20transaction%20originated:%20%0D%0A%0D%0A*%20Link%20to%20screenshots%20uploaded%20to%20imgur%20of%20my%20conversations%20with%20the%20scammer:%20%0D%0A%0D%0A*%20An%20explanation%20of%20the%20situation:%20).\\n\\nThe presence of this message does **not** indicate a need to message the moderators. \\n\\n\\n*I am a bot, and this action was performed automatically. Please [contact the moderators of this subreddit](/message/compose/?to=/r/Watchexchange) if you have any questions or concerns.* | liuserr: For sale is a Sinn 356 that comes on an unworn leather strap. I\\'m selling it because a chronograph is a little too frail for everyday wear. The watch comes on a sandblasted case with very little signs of wear. I believe the day is slightly misaligned, but not by a terrible amount if so as I can barely tell when scrolling through the days.\\n\\nThe watch comes on its OEM black leather strap that has not been worn - there are no creases on the strap holes. It also comes with the inner and outer boxes, instruction manual, and warranty card dated: Jan 2020.\\n\\nModel: Sinn 356 Movement: SW-500 Case Size: 38.5mm Case Thickness: 15mm Lug Width: 20mm Lug to lug: 46mm Water resistance: 100 meters\\n\\nAlbum: https://imgur.com/a/Lbafeue \\nTimestamp: https://i.imgur.com/9OYeF6O.jpg\\n\\nPrice: $1700 shipped CONUS via Zelle, wire transfer, or Paypal F&F for established members. Face to face in the Bay Area is welcomed.\\n\\nNo trades please.', 'processed': False}] count=None\n", - "Case Diameter 38.5\n", - "Case Thickness 15\n", - "Lug Width 20\n", - "Lug-to-Lug 46\n", - "Price 1700\n" - ] - } - ], - "source": [ - "from openai import OpenAI\n", - "import os\n", - "import json\n", - "\n", - "# Set the API key\n", - "client = OpenAI(\n", - " api_key= os.environ.get('OPENAI_API_CHRONO_KEY')\n", - ")\n", - "\n", - "with open('query_schema.json') as f:\n", - " output_schema_str = f.read()\n", - "\n", - "# Fetch data from Supabase queue\n", - "try:\n", - " queue_data = supabase.table('rqueue').select('*').eq('processed', False).limit(1).execute()\n", - " if len(queue_data.data) < 2: # Fixed to check for non-empty data\n", - " for item in queue_data.data:\n", - " relevant_data = {key: item[key] for key in [\"author_id\", \"title\", \"url\", \"comments\"]}\n", - " item_json = json.dumps(relevant_data)\n", - " prompt = f\"Given the data: {item_json}, construct a JSON object that adheres to the specified output schema. Output schema: {output_schema_str}\"\n", - " try:\n", - " response = client.chat.completions.create( \n", - " model=\"gpt-3.5-turbo-0125\", \n", - " response_format={ \"type\": \"json_object\" },\n", - " messages=[{\"role\": \"system\", \"content\": \"You are a helpful assistant that outputs valid JSON.>\"}, \n", - " {\"role\": \"user\", \"content\": prompt}],\n", - " )\n", - " try:\n", - " response_json = json.loads(response.choices[0].message.content)\n", - " except Exception as e:\n", - " print(\"Error in openai response: \", e)\n", - "\n", - " try:\n", - " validated_response = validate_schema(response_json)\n", - " try:\n", - " # supabase.table(\"watches\").insert([validated_response]).execute()\n", - " supabase.table(\"rqueue\").update({\"processed\": True}).eq(\"post_id\", item[\"post_id\"]).execute()\n", - " except Exception as e:\n", - " print(f\"Failed to push to supabase (watches): {e}\")\n", - " except Exception as e:\n", - " print(f\"current response could not be validated: {e}\")\n", - " \n", - " except Exception as e:\n", - " print(f\"An OpenAI error occurred: {e}\")\n", - " \n", - "\n", - "except Exception as e:\n", - " print(f\"Failed to fetch data from Supabase (rqueue): {e}\")\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "base", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.5" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/src/data_collection/schema_validator.py b/src/data_collection/schema_validator.py index 89fd201..06e9ee0 100644 --- a/src/data_collection/schema_validator.py +++ b/src/data_collection/schema_validator.py @@ -12,20 +12,33 @@ def filter_invalid_entries(data, schema): :return: A new dictionary with only the valid entries according to the schema. """ valid_data = {} + for key in schema.get("properties", {}).keys(): + required = schema.get("properties", {}).get(key, {}).get("required", False) + if required: + valid_data[key] = "" + for key, value in data.items(): expected_type = schema.get("properties", {}).get(key, {}).get("type", None) if expected_type: - if expected_type == "number" and value is not None: - try: - converted_value = int(value) - except ValueError: - try: - converted_value = float(value) - except ValueError: + try: + if expected_type == "number": + if value == None: + continue + valid_data[key] = float(value) if '.' in str(value) else int(value) + elif expected_type == "string": + if value == None: continue - valid_data[key] = converted_value - elif expected_type == "string" and isinstance(value, str): - valid_data[key] = value + valid_data[key] = str(value) + else: + raise ValueError(f"Unsupported type {expected_type} for key {key}") + except (ValueError, TypeError): + print(f"Error converting {key} with value {value} to {expected_type}, skipping...") + continue + required_fields_empty = all(valid_data[key] in [None, ""] for key in valid_data if schema.get("properties", {}).get(key, {}).get("required", True)) + if required_fields_empty: + print("reqfieldemp", json.dumps(valid_data, indent=4)) + + raise ValueError("All required fields are either None or empty string.") return valid_data @@ -38,10 +51,12 @@ def validate_schema(data: dict): # Load the schema with open('src/data_collection/watch_schema.json', 'r') as file: schema = json.load(file) - + + # print("BEFORE FILTERING", json.dumps(data, indent=4)) json_data = filter_invalid_entries(data, schema) + # print("AFTER FILTERING", json.dumps(json_data, indent=4)) + # Validate the JSON, on failure throw exception validate(instance=json_data, schema=schema) return json_data - diff --git a/src/data_collection/test.ipynb b/src/data_collection/test.ipynb deleted file mode 100644 index e6b892d..0000000 --- a/src/data_collection/test.ipynb +++ /dev/null @@ -1,154 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [], - "source": [ - "import webtranspose as webt\n", - "import os\n", - "import json " - ] - }, - { - "cell_type": "code", - "execution_count": 13, - "metadata": {}, - "outputs": [], - "source": [ - "# # Crawl\n", - "# crawl = webt.Crawl(\n", - "# 'https://www.seikowatches.com/us-en/watchfinder?page=100',\n", - "# allowed_urls=[],\n", - "# banned_urls=[],\n", - "# max_pages=1,\n", - "# render_js=True,\n", - "# api_key=os.environ['WEBTRANSPOSE_API_KEY']\n", - "# )\n", - "\n", - "# crawl.queue_crawl() # async\n", - "\n", - "# # crawl.crawl() # sync\n" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [], - "source": [ - "query = json.load(open('query_schema.json'))\n", - "\n", - "scraper = webt.Scraper(\n", - " query, \n", - " scraper_id='scrape single seiko page',\n", - " name='seiko_page_scraper',\n", - " render_js=True, \n", - " api_key=os.environ['WEBTRANSPOSE_API_KEY'], # optional, if you want to run on cloud\n", - " # proxy=True, # optional, if you want to use proxy\n", - ") \n", - "\n", - "# out_json = scraper.scrape('https://www.seikowatches.com/us-en/products/presage/spb441j1')" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "test_json = {'Brand': 'Seiko',\n", - " 'Model': 'SPB441J1',\n", - " 'Reference Number': 'SPB441J1',\n", - " 'Case Material': 'Stainless steel (super hard coating)',\n", - " 'Case Diameter': 35.0,\n", - " 'Case Thickness': 12.3,\n", - " 'Lug Width': 11,\n", - " 'Lug-to-Lug': 37.0,\n", - " 'Dial Color': 'Enamel',\n", - " 'Crystal Type': 'Box shaped sapphire crystal',\n", - " 'Water Resistance': '5 bar',\n", - " 'Movement': '6R5H',\n", - " 'Caliber': '6R5H',\n", - " 'Movement Type': 'Automatic with manual winding',\n", - " 'Bracelet/Strap Material': 'Calfskin',\n", - " 'Product Weight': '59.0g',\n", - " 'Price': 1900,\n", - " 'Photo URL': 'https://www.seikowatches.com/us-en/-/media/Images/Product--Image/All/Seiko/2023/10/16/11/57/SPB441J1/SPB441J1.png?mh=1000&mw=1000&hash=FED3D8AEDD94CFCC97DDA5671B158436',\n", - " 'Merchant Name': 'Seiko Watch Corporation',\n", - " 'Product URL': 'https://www.seikowatches.com/us-en/products/presage/spb441j1'}" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "2024-02-06 14:25:20,007:INFO - HTTP Request: POST https://gvvfniijngcyqnwvrbal.supabase.co/rest/v1/watches \"HTTP/1.1 409 Conflict\"\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "{'code': '23505', 'details': 'Key (\"Brand\", \"Reference Number\")=(Seiko, SPB441J1) already exists.', 'hint': None, 'message': 'duplicate key value violates unique constraint \"watches_pkey\"'}\n", - "data=[{'Brand': 'Seiko', 'Model': 'SPB441J1', 'Reference Number': 'SPB441J1', 'Case Material': 'Stainless steel (super hard coating)', 'Case Diameter': 35, 'Case Thickness': 12.3, 'Lug Width': 11, 'Lug-to-Lug': 37, 'Dial Color': 'Enamel', 'Crystal Type': 'Box shaped sapphire crystal', 'Water Resistance': '5 bar', 'Movement': '6R5H', 'Caliber': '6R5H', 'Movement Type': 'Automatic with manual winding', 'Power Reserve': None, 'Bracelet/Strap Material': 'Calfskin', 'Clasp Type': None, 'Product Weight': '59.0g', 'Features': None, 'Price': 1900, 'Availability': None, 'Photo URL': 'https://www.seikowatches.com/us-en/-/media/Images/Product--Image/All/Seiko/2023/10/16/11/57/SPB441J1/SPB441J1.png?mh=1000&mw=1000&hash=FED3D8AEDD94CFCC97DDA5671B158436', 'Merchant Name': 'Seiko Watch Corporation', 'Product URL': 'https://www.seikowatches.com/us-en/products/presage/spb441j1'}] count=None\n" - ] - } - ], - "source": [ - "from supabase import create_client, Client\n", - "\n", - "# Supabase project details\n", - "SUPABASE_URL = os.environ['SUPABASE_WATCHDB_URL']\n", - "SUPABASE_KEY = os.environ['SUPABASE_WATCHDB_SERVICE_ROLE_KEY']\n", - "\n", - "supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)\n", - "\n", - "# Your JSON data\n", - "data = [\n", - " # Add your JSON objects here, e.g.,\n", - " test_json\n", - "]\n", - "\n", - "# Push data to the 'watches' table\n", - "try:\n", - " response = supabase.table(\"watches\").insert(data).execute()\n", - "except Exception as e:\n", - " print(e)\n", - " print(response)\n", - "# Check response\n", - "# if response.error:\n", - "# print(f\"Error: {response.error.message}\")\n", - "# else:\n", - "# print(\"Data pushed successfully\")\n" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "base", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.5" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/src/data_collection/watch_schema.json b/src/data_collection/watch_schema.json index 25f32ee..12af2db 100644 --- a/src/data_collection/watch_schema.json +++ b/src/data_collection/watch_schema.json @@ -3,32 +3,33 @@ "properties": { "Brand": {"type": "string"}, "Model": {"type": "string"}, - "Reference Number": {"type": "string"}, - "Case Material": {"type": "string"}, - "Case Diameter": {"type": "number"}, - "Case Thickness": {"type": "number"}, - "Lug Width": {"type": "number"}, - "Lug-to-Lug": {"type": "number"}, - "Dial Color": {"type": "string"}, - "Crystal Type": {"type": "string"}, - "Water Resistance": {"type": "string"}, + "Reference_Number": {"type": "string"}, + "Case_Material": {"type": "string"}, + "Case_Diameter": {"type": "number"}, + "Case_Thickness": {"type": "number"}, + "Lug_Width": {"type": "number"}, + "Lug_to_Lug": {"type": "number"}, + "Dial_Color": {"type": "string"}, + "Crystal_Type": {"type": "string"}, + "Water_Resistance": {"type": "string"}, "Movement": {"type": "string"}, "Caliber": {"type": "string"}, - "Movement Type": {"type": "string"}, - "Power Reserve": {"type": "string"}, - "Bracelet/Strap Material": {"type": "string"}, - "Clasp Type": {"type": "string"}, - "Product Weight": {"type": "string"}, + "Movement_Type": {"type": "string"}, + "Power_Reserve": {"type": "string"}, + "Bracelet_Strap_Material": {"type": "string"}, + "Clasp_Type": {"type": "string"}, + "Product_Weight": {"type": "string"}, "Features": {"type": "string"}, "Price": {"type": "number"}, "Availability": {"type": "string"}, - "Photo URL": {"type": "string"}, - "Merchant Name": {"type": "string"}, - "Product URL": {"type": "string"} + "Photo_URL": {"type": "string"}, + "Merchant_Name": {"type": "string"}, + "Product_URL": {"type": "string"} }, "required": [ "Brand", - "Model" + "Model", + "Reference_Number" ], "additionalProperties": false } \ No newline at end of file diff --git a/src/data_collection/watchdb_populator.py b/src/data_collection/watchdb_populator.py index e03e808..e480ada 100644 --- a/src/data_collection/watchdb_populator.py +++ b/src/data_collection/watchdb_populator.py @@ -1,18 +1,22 @@ import argparse import json -import logging # Added logging module +import logging import os - -from crud import create_watch, get_queued_posts, mark_post_as_processed -from database import SessionLocal +from supabase import create_client, Client from jsonschema.exceptions import ValidationError -from models import Watch -from openai import OpenAI +from dotenv import load_dotenv from schema_validator import validate_schema +from openai import OpenAI +from postgrest.exceptions import APIError +load_dotenv() # Configure logging logging.basicConfig(filename="app.log", level=logging.INFO) +# Initialize Supabase client +url: str = os.environ.get("SUPABASE_WATCHDB_URL") +key: str = os.environ.get("SUPABASE_WATCHDB_SERVICE_ROLE_KEY") +supabase: Client = create_client(url, key) def process_queue(num_requests): openai_key = os.environ.get("OPENAI_API_CHRONO_KEY") @@ -22,16 +26,26 @@ def process_queue(num_requests): output_schema_str = f.read() try: - with SessionLocal() as session: - queue_data = get_queued_posts(session, num_requests) + queue_data = supabase.table("queued_posts").select("*").eq("processed", False).limit(num_requests).execute() + + # if queue_data.error: + # raise Exception(queue_data.error.message) + queue_data = queue_data.data except Exception as e: - logging.error(f"Failed to fetch data from Supabase (rqueue): {str(e)}") + logging.error(f"Failed to fetch data from Supabase: {str(e)}") return for item in queue_data: try: - relevant_data = item.get_relevant_data() + relevant_data = json.dumps( + { + "author_id": item['author_id'], + "title": item['title'], + "comments": item['comments'], + "url": item['url'], + } + ) prompt = f"Given the data: {relevant_data}, construct a JSON object that adheres to the specified output schema. Output schema: {output_schema_str}" response = client.chat.completions.create( model="gpt-3.5-turbo-0125", @@ -39,7 +53,7 @@ def process_queue(num_requests): messages=[ { "role": "system", - "content": "You are a helpful assistant that outputs valid JSON.>", + "content": "You are a helpful assistant that outputs valid JSON.", }, {"role": "user", "content": prompt}, ], @@ -47,49 +61,34 @@ def process_queue(num_requests): response_json = json.loads(response.choices[0].message.content) logging.info(f"Response JSON: {response_json}") validated_response = validate_schema(response_json) - watch = Watch( - Brand=validated_response.get("Brand", "Unknown"), - Reference_Number=validated_response.get("Reference Number", "None"), - Timestamp=item.created_at, - Model=validated_response.get("Model", None), - Case_Material=validated_response.get("Case Material", None), - Case_Diameter=validated_response.get("Case Diameter", None), - Case_Thickness=validated_response.get("Case Thickness", None), - Lug_Width=validated_response.get("Lug Width", None), - Lug_to_Lug=validated_response.get("Lug-to-Lug", None), - Dial_Color=validated_response.get("Dial Color", None), - Crystal_Type=validated_response.get("Crystal Type", None), - Water_Resistance=validated_response.get("Water Resistance", None), - Movement=validated_response.get("Movement", None), - Caliber=validated_response.get("Caliber", None), - Movement_Type=validated_response.get("Movement Type", None), - Power_Reserve=validated_response.get("Power Reserve", None), - Bracelet_Strap_Material=validated_response.get( - "Bracelet/Strap Material", None - ), - Clasp_Type=validated_response.get("Clasp Type", None), - Product_Weight=validated_response.get("Product Weight", None), - Features=validated_response.get("Features", None), - Price=validated_response.get("Price", None), - Availability=validated_response.get("Availability", None), - Photo_URL=validated_response.get("Photo URL", None), - Merchant_Name=validated_response.get("Merchant Name", None), - Product_URL=validated_response.get("Product URL", None), - ) - with SessionLocal() as session: - create_watch(session, watch) - mark_post_as_processed(session, item.post_id) + # print("validated response: ", validated_response, type(validated_response)) + validated_response['Timestamp']=item['created_at'] + # validated_response['processed'] = True # Mark as processed + # validated_response['post_id'] = item['id'] # Assuming 'id' is the correct field + + # Insert or update the watch data in Supabase + response = supabase.table("watches").insert(validated_response).execute() + + # Mark the original post as processed + response = supabase.table("queued_posts").update({"processed": True}).eq("post_id", item['post_id']).execute() + + except json.JSONDecodeError as json_err: - logging.error(f"Error in parsing the JSON outputted by OpenAI:\n\t {e}") + logging.error(f"Error in parsing the JSON outputted by OpenAI:\n\t {json_err}") except ValidationError as e: logging.error( f"Schema Validation failed, likely missing some data:\n\tjson:{response_json}\n\terr:{e}" ) + except APIError as api_err: + if api_err.code == '23505': + response = supabase.table("queued_posts").update({"processed": True}).eq("post_id", item['post_id']).execute() + print("Duplicate entry, skipping.") + else: + print("API Error: ", api_err) except Exception as e: - logging.error(f"Unkown Exception: {e}") + logging.error(f"Unknown Exception: {e}") raise - if __name__ == "__main__": parser = argparse.ArgumentParser( description="Process queue items and format them using OpenAI"