From 266016e46dbe768801dfd96e3dc0bab6568f2cb3 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Fri, 26 Jul 2024 21:45:48 +0200 Subject: [PATCH] update cron job --- app/cron_ownership_stats.py | 49 +++++++++++++++++++++++++++++-------- app/main.py | 38 ++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/app/cron_ownership_stats.py b/app/cron_ownership_stats.py index 827b8a4..edaaa19 100644 --- a/app/cron_ownership_stats.py +++ b/app/cron_ownership_stats.py @@ -2,27 +2,56 @@ import ujson import time import sqlite3 -import pandas as pd -import numpy as np -from collections import defaultdict import time import asyncio import aiohttp -from faker import Faker +import random from tqdm import tqdm from dotenv import load_dotenv import os + + load_dotenv() api_key = os.getenv('FMP_API_KEY') -async def get_data(session, symbol): - url = f"https://financialmodelingprep.com/api/v4/institutional-ownership/symbol-ownership?symbol={symbol}&includeCurrentQuarter=true&apikey={api_key}" - async with session.get(url) as response: - data = await response.json() - if len(data) > 0: - await save_json(symbol, data[0]) #return only the latest ownership stats +include_current_quarter = False + + +async def get_data(session, symbol, max_retries=3, initial_delay=1): + url = f"https://financialmodelingprep.com/api/v4/institutional-ownership/symbol-ownership?symbol={symbol}&includeCurrentQuarter={include_current_quarter}&apikey={api_key}" + + for attempt in range(max_retries): + try: + async with session.get(url) as response: + if response.status == 200: + content_type = response.headers.get('Content-Type', '') + if 'application/json' in content_type: + data = await response.json() + if len(data) > 0: + await save_json(symbol, data[0]) + return + else: + print(f"Unexpected content type for {symbol}: {content_type}") + elif response.status == 504: + if attempt < max_retries - 1: + delay = initial_delay * (2 ** attempt) + random.uniform(0, 1) + print(f"Gateway Timeout for {symbol}. Retrying in {delay:.2f} seconds...") + await asyncio.sleep(delay) + else: + print(f"Max retries reached for {symbol} after Gateway Timeout") + else: + print(f"Error fetching data for {symbol}: HTTP {response.status}") + return + except Exception as e: + print(f"Error processing {symbol}: {str(e)}") + if attempt < max_retries - 1: + delay = initial_delay * (2 ** attempt) + random.uniform(0, 1) + print(f"Retrying in {delay:.2f} seconds...") + await asyncio.sleep(delay) + else: + print(f"Max retries reached for {symbol}") async def save_json(symbol, data): diff --git a/app/main.py b/app/main.py index a66bb8e..fc7f823 100755 --- a/app/main.py +++ b/app/main.py @@ -2569,6 +2569,44 @@ async def get_options_flow_ticker(data:TickerData, api_key: str = Security(get_a headers={"Content-Encoding": "gzip"} ) + +''' +@app.post("/options-flow-feed") +async def get_options_flow_feed(data: LastOptionId, api_key: str = Security(get_api_key)): + last_option_id = data.lastId + + try: + with open(f"json/options-flow/feed/data.json", 'rb') as file: + all_data = orjson.loads(file.read()) + + if len(last_option_id) == 0: + res_list = all_data[0:100] + else: + # Find the index of the element with the last known ID + start_index = next((i for i, item in enumerate(all_data) if item["id"] == last_option_id), -1) + if start_index == -1: + raise ValueError("Last known ID not found in data") + + # Get the next 100 elements + res_list = all_data[start_index + 1:start_index + 101] + + # Compress the data + compressed_data = gzip.compress(orjson.dumps(res_list)) + + return StreamingResponse( + io.BytesIO(compressed_data), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) + except Exception as e: + # Log the error for debugging + print(f"Error: {str(e)}") + return StreamingResponse( + io.BytesIO(gzip.compress(orjson.dumps([]))), + media_type="application/json", + headers={"Content-Encoding": "gzip"} + ) +''' @app.get("/options-flow-feed") async def get_options_flow_feed(api_key: str = Security(get_api_key)): try: