Skip to content

Commit

Permalink
better fault handling
Browse files Browse the repository at this point in the history
  • Loading branch information
aapatni committed Feb 7, 2024
1 parent 6a19290 commit a0e9838
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 44 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ jobs:
REDDIT_APP_KEY: ${{ secrets.REDDIT_APP_KEY }}
SUPABASE_WATCHDB_URL: ${{ secrets.SUPABASE_WATCHDB_URL }}
SUPABASE_WATCHDB_SERVICE_ROLE_KEY: ${{ secrets.SUPABASE_WATCHDB_SERVICE_ROLE_KEY }}
run: python src/data_collection/reddit_crawler.py
run: python src/data_collection/reddit_crawler.py --post_limit 15

- name: Run the watchdb populator to
env:
REDDIT_APP_ID: ${{ secrets.REDDIT_APP_ID }}
REDDIT_APP_KEY: ${{ secrets.REDDIT_APP_KEY }}
SUPABASE_WATCHDB_URL: ${{ secrets.SUPABASE_WATCHDB_URL }}
SUPABASE_WATCHDB_SERVICE_ROLE_KEY: ${{ secrets.SUPABASE_WATCHDB_SERVICE_ROLE_KEY }}
run: python src/data_collection/watchdb_populator.py
run: python src/data_collection/watchdb_populator.py --num_requests 25
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
webtranspose-out/
webtranspose-out/
**/__pycache__/
4 changes: 2 additions & 2 deletions src/data_collection/schema_validator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import json

from jsonschema import validate
from jsonschema.exceptions import ValidationError


def filter_invalid_entries(data, schema):
"""
Removes entries from the data that do not match the schema's type requirements.
Expand All @@ -15,7 +16,6 @@ def filter_invalid_entries(data, schema):
expected_type = schema.get("properties", {}).get(key, {}).get("type", None)
if expected_type:
if expected_type == "number" and value is not None:
print(key,value)
try:
converted_value = int(value)
except ValueError:
Expand Down
83 changes: 44 additions & 39 deletions src/data_collection/watchdb_populator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
from openai import OpenAI
from supabase import create_client, Client
from schema_validator import validate_schema
from jsonschema.exceptions import ValidationError
from postgrest.exceptions import APIError

def process_queue(supabase_url, supabase_key, openai_key):
url: str = os.environ.get('SUPABASE_WATCHDB_URL')
key: str = os.environ.get('SUPABASE_WATCHDB_SERVICE_ROLE_KEY')
def handle_exception(e, context=""):
print(f"{context}: {str(e)}")

def process_queue(num_requests):
supabase_url: str = os.environ.get('SUPABASE_WATCHDB_URL')
supabase_key: str = os.environ.get('SUPABASE_WATCHDB_SERVICE_ROLE_KEY')
openai_key = os.environ.get('OPENAI_API_CHRONO_KEY')

# Supabase setup
Expand All @@ -23,47 +28,47 @@ def process_queue(supabase_url, supabase_key, openai_key):

# Fetch data from Supabase queue
try:
queue_data = supabase.table('rqueue').select('*').eq('processed', False).limit(1).execute()
if len(queue_data.data) < 2: # Fixed to check for non-empty data
for item in queue_data.data:
relevant_data = {key: item[key] for key in ["author_id", "title", "url", "comments"]}
item_json = json.dumps(relevant_data)
prompt = f"Given the data: {item_json}, construct a JSON object that adheres to the specified output schema. Output schema: {output_schema_str}"
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo-0125",
response_format={ "type": "json_object" },
messages=[{"role": "system", "content": "You are a helpful assistant that outputs valid JSON.>"},
{"role": "user", "content": prompt}],
)
try:
response_json = json.loads(response.choices[0].message.content)
except Exception as e:
print("Error in openai response: ", e)
queue_data = supabase.table('rqueue').select('*').eq('processed', False).limit(num_requests).execute()
except Exception as e:
print(f"Failed to fetch data from Supabase (rqueue): {str(e)}")
return

try:
validated_response = validate_schema(response_json)
try:
# supabase.table("watches").insert([validated_response]).execute()
supabase.table("rqueue").update({"processed": True}).eq("post_id", item["post_id"]).execute()
except Exception as e:
print(f"Failed to push to supabase (watches): {e}")
except Exception as e:
print(f"current response could not be validated: {e}")

except Exception as e:
print(f"An OpenAI error occurred: {e}")

for item in queue_data.data:
try:
relevant_data = {key: item[key] for key in ["author_id", "title", "url", "comments"]}
item_json = json.dumps(relevant_data)
prompt = f"Given the data: {item_json}, construct a JSON object that adheres to the specified output schema. Output schema: {output_schema_str}"
response = client.chat.completions.create(
model="gpt-3.5-turbo-0125",
response_format={ "type": "json_object" },
messages=[{"role": "system", "content": "You are a helpful assistant that outputs valid JSON.>"},
{"role": "user", "content": prompt}],
)
response_json = json.loads(response.choices[0].message.content)
validated_response = validate_schema(response_json)
supabase.table("watches").insert([validated_response]).execute()
supabase.table("rqueue").update({"processed": True}).eq("post_id", item["post_id"]).execute()
except json.JSONDecodeError as json_err:
print(f"Error in parsing the JSON outputted by OpenAI:\n\t {e}")
except ValidationError as e:
print(f"Schema Validation failed, likely missing some data:\n\tjson:{response_json}\n\terr:{e}")
except APIError as e:
if api_error.code == '23505':
# there's a duplicate, so let's mark this watch as processed (TODO: let's solve the duplication issue properly)
supabase.table("rqueue").update({"processed": True}).eq("post_id", item["post_id"]).execute()
if "rqueue" in str(e):
print(f"Failed to write processed flag to supabase: {str(e)}")
elif "watches" in str(e):
print(f"Failed to write watch data to supabase: {str(e)}")
except Exception as e:
print(f"Unkown Exception: {e}")
raise

except Exception as e:
print(f"Failed to fetch data from Supabase (rqueue): {e}")

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process queue items and format them using OpenAI")
# parser.add_argument("--supabase_url", required=True, help="Supabase project URL")
# parser.add_argument("--supabase_key", required=True, help="Supabase service role key")
# parser.add_argument("--openai_key", required=True, help="OpenAI API key")
parser.add_argument("--num_requests", type=int, required=True, default=5, help="Max number of requests to process from the queue")

args = parser.parse_args()

process_queue(args.supabase_url, args.supabase_key, args.openai_key)
process_queue(args.num_requests)

0 comments on commit a0e9838

Please sign in to comment.