diff --git a/app/cron_dark_pool_flow.py b/app/cron_dark_pool_flow.py index ff3ff20..a9c93be 100644 --- a/app/cron_dark_pool_flow.py +++ b/app/cron_dark_pool_flow.py @@ -7,6 +7,8 @@ import pytz import requests # Add missing import from dateutil.parser import isoparse +from utils.helper import load_latest_json + load_dotenv() api_key = os.getenv('UNUSUAL_WHALES_API_KEY') @@ -36,43 +38,21 @@ def get_quote_data(symbol): except FileNotFoundError: return None -def load_json(file_path): - """Load existing JSON data from file.""" - if os.path.exists(file_path): - try: - with open(file_path, 'r') as file: - return orjson.loads(file.read()) - except (ValueError, IOError): - print(f"Warning: Could not read or parse {file_path}. Starting with an empty list.") - return [] -def save_latest_ratings(combined_data, json_file_path, limit=2000): +def save_to_daily_file(data, directory): + """Save data to a daily JSON file.""" try: - # Create a set to track unique entries based on a combination of 'ticker' and 'date' - seen = set() - unique_data = [] - - for item in combined_data: - identifier = f"{item['trackingID']}" - if identifier not in seen: - seen.add(identifier) - unique_data.append(item) - - # Sort the data by date - sorted_data = sorted(unique_data, key=lambda x: datetime.fromisoformat(x['date'].replace('Z', '+00:00')), reverse=True) - - # Keep only the latest `limit` entries - latest_data = sorted_data[:limit] - - # Save the trimmed and deduplicated data to the JSON file - with open(json_file_path, 'wb') as file: - file.write(orjson.dumps(latest_data)) - - print(f"Saved {len(latest_data)} unique and latest ratings to {json_file_path}.") + # Ensure the directory exists + os.makedirs(directory, exist_ok=True) + # Generate filename based on today's date + date_str = datetime.now().strftime('%Y-%m-%d') + file_path = os.path.join(directory, f"{date_str}.json") + # Save data to the file + with open(file_path, 'wb') as file: + file.write(orjson.dumps(data)) + print(f"{len(data)} datapoints successfully saved to {file_path}") except Exception as e: - print(f"An error occurred while saving data: {e}") - - + print(f"Error saving data to file: {e}") def get_data(): try: @@ -83,78 +63,51 @@ def get_data(): return [] def main(): - # Load environment variables - con = sqlite3.connect('stocks.db') - cursor = con.cursor() - cursor.execute("PRAGMA journal_mode = wal") - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'") - stock_symbols = [row[0] for row in cursor.fetchall()] + # Directory for saving daily historical flow data + historical_directory = 'json/dark-pool/historical-flow' - etf_con = sqlite3.connect('etf.db') - etf_cursor = etf_con.cursor() - etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") - etf_symbols = [row[0] for row in etf_cursor.fetchall()] - total_symbols = stock_symbols + etf_symbols - con.close() - etf_con.close() + # Load the latest JSON file from the directory + existing_data = load_latest_json(historical_directory) + existing_keys = {item.get('trackingID', None) for item in existing_data} - json_file_path = 'json/dark-pool/feed/data.json' - existing_data = load_json(json_file_path) - # Transform existing data into a set of unique trackingIDs - existing_keys = {item.get('trackingID',None) for item in existing_data} + # Fetch new data from the API data = get_data() - - # Prepare results with only new data res = [] for item in data: symbol = item['ticker'] if symbol.lower() == 'brk.b': item['ticker'] = 'BRK-B' - symbol = item['ticker'] if symbol.lower() == 'brk.a': item['ticker'] = 'BRK-A' - symbol = item['ticker'] - if symbol in total_symbols: - quote_data = get_quote_data(symbol) - if symbol in stock_symbols: - asset_type = 'Stock' - else: - asset_type = 'ETF' - - try: - # Check if the data is already in the file - if item['tracking_id'] not in existing_keys: - try: - sector = stock_screener_data_dict[symbol].get('sector', None) - except: - sector = "" - - volume = float(item['volume']) - size = float(item['size']) - - size_volume_ratio = round((size / volume) * 100, 2) - size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2) - res.append({ - 'ticker': item['ticker'], - 'date': item['executed_at'], - 'price': round(float(item['price']),2), - 'size': item['size'], - 'volume': volume, - 'premium': item['premium'], - 'sector': sector, - 'assetType': asset_type, - 'sizeVolRatio': size_volume_ratio, - 'sizeAvgVolRatio': size_avg_volume_ratio, - 'trackingID': item['tracking_id'] - }) - except Exception as e: - print(f"Error processing {symbol}: {e}") - - # Append new data to existing data and combine + try: + if item['tracking_id'] not in existing_keys: + sector = stock_screener_data_dict.get(symbol, {}).get('sector', "") + volume = float(item['volume']) + size = float(item['size']) + quote_data = get_quote_data(symbol) or {} + size_volume_ratio = round((size / volume) * 100, 2) + size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2) + res.append({ + 'ticker': item['ticker'], + 'date': item['executed_at'], + 'price': round(float(item['price']), 2), + 'size': item['size'], + 'volume': volume, + 'premium': item['premium'], + 'sector': sector, + 'assetType': 'Stock' if symbol in stock_screener_data_dict else 'ETF', + 'sizeVolRatio': size_volume_ratio, + 'sizeAvgVolRatio': size_avg_volume_ratio, + 'trackingID': item['tracking_id'] + }) + except Exception as e: + print(f"Error processing {symbol}: {e}") + + # Combine new data with existing data combined_data = existing_data + res - # Save the updated data - save_latest_ratings(combined_data, json_file_path) + # Save the combined data to a daily file + save_to_daily_file(combined_data, historical_directory) if __name__ == '__main__': main() diff --git a/app/main.py b/app/main.py index 868559d..97d12fa 100755 --- a/app/main.py +++ b/app/main.py @@ -39,6 +39,7 @@ from slowapi.errors import RateLimitExceeded from functools import partial from datetime import datetime +from utils.helper import load_latest_json # DB constants & context manager @@ -2940,13 +2941,12 @@ async def get_options_flow_feed(api_key: str = Security(get_api_key)): headers={"Content-Encoding": "gzip"} ) + @app.get("/dark-pool-flow-feed") async def get_dark_pool_feed(api_key: str = Security(get_api_key)): - try: - with open(f"json/dark-pool/feed/data.json", 'rb') as file: - res_list = orjson.loads(file.read()) - except: - res_list = [] + directory = "json/dark-pool/historical-flow" + res_list = load_latest_json(directory) + data = orjson.dumps(res_list) compressed_data = gzip.compress(data) return StreamingResponse( diff --git a/app/utils/__pycache__/helper.cpython-310.pyc b/app/utils/__pycache__/helper.cpython-310.pyc index a3a84a3..86c3f4d 100644 Binary files a/app/utils/__pycache__/helper.cpython-310.pyc and b/app/utils/__pycache__/helper.cpython-310.pyc differ diff --git a/app/utils/helper.py b/app/utils/helper.py index aeb823a..2d62133 100644 --- a/app/utils/helper.py +++ b/app/utils/helper.py @@ -1,4 +1,6 @@ from datetime import datetime, timedelta, time +import os +import orjson import pytz def check_market_hours(): @@ -28,3 +30,30 @@ def check_market_hours(): return True #"Market hours." else: return False #"Market is closed." + + +def load_latest_json(directory: str): + """Load the latest JSON file from a directory based on the filename (assumed to be a date).""" + try: + latest_file = None + latest_date = None + + # Iterate over files in the directory + for filename in os.listdir(directory): + if filename.endswith('.json'): + # Extract date from filename (assumed format 'YYYY-MM-DD.json') + file_date = filename.split('.')[0] + + if latest_date is None or file_date > latest_date: + latest_date = file_date + latest_file = filename + + if not latest_file: + return [] # No files found + + latest_file_path = os.path.join(directory, latest_file) + with open(latest_file_path, 'rb') as file: + return orjson.loads(file.read()) + except Exception as e: + print(f"Error loading latest JSON file: {e}") + return [] \ No newline at end of file