Skip to content

Commit

Permalink
update dark pool cron job
Browse files Browse the repository at this point in the history
  • Loading branch information
MuslemRahimi committed Dec 26, 2024
1 parent 24409f0 commit dc9be95
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 99 deletions.
141 changes: 47 additions & 94 deletions app/cron_dark_pool_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import pytz
import requests # Add missing import
from dateutil.parser import isoparse
from utils.helper import load_latest_json


load_dotenv()
api_key = os.getenv('UNUSUAL_WHALES_API_KEY')
Expand Down Expand Up @@ -36,43 +38,21 @@ def get_quote_data(symbol):
except FileNotFoundError:
return None

def load_json(file_path):
"""Load existing JSON data from file."""
if os.path.exists(file_path):
try:
with open(file_path, 'r') as file:
return orjson.loads(file.read())
except (ValueError, IOError):
print(f"Warning: Could not read or parse {file_path}. Starting with an empty list.")
return []

def save_latest_ratings(combined_data, json_file_path, limit=2000):
def save_to_daily_file(data, directory):
"""Save data to a daily JSON file."""
try:
# Create a set to track unique entries based on a combination of 'ticker' and 'date'
seen = set()
unique_data = []

for item in combined_data:
identifier = f"{item['trackingID']}"
if identifier not in seen:
seen.add(identifier)
unique_data.append(item)

# Sort the data by date
sorted_data = sorted(unique_data, key=lambda x: datetime.fromisoformat(x['date'].replace('Z', '+00:00')), reverse=True)

# Keep only the latest `limit` entries
latest_data = sorted_data[:limit]

# Save the trimmed and deduplicated data to the JSON file
with open(json_file_path, 'wb') as file:
file.write(orjson.dumps(latest_data))

print(f"Saved {len(latest_data)} unique and latest ratings to {json_file_path}.")
# Ensure the directory exists
os.makedirs(directory, exist_ok=True)
# Generate filename based on today's date
date_str = datetime.now().strftime('%Y-%m-%d')
file_path = os.path.join(directory, f"{date_str}.json")
# Save data to the file
with open(file_path, 'wb') as file:
file.write(orjson.dumps(data))
print(f"{len(data)} datapoints successfully saved to {file_path}")
except Exception as e:
print(f"An error occurred while saving data: {e}")


print(f"Error saving data to file: {e}")

def get_data():
try:
Expand All @@ -83,78 +63,51 @@ def get_data():
return []

def main():
# Load environment variables
con = sqlite3.connect('stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE symbol NOT LIKE '%.%'")
stock_symbols = [row[0] for row in cursor.fetchall()]
# Directory for saving daily historical flow data
historical_directory = 'json/dark-pool/historical-flow'

etf_con = sqlite3.connect('etf.db')
etf_cursor = etf_con.cursor()
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]
total_symbols = stock_symbols + etf_symbols
con.close()
etf_con.close()
# Load the latest JSON file from the directory
existing_data = load_latest_json(historical_directory)
existing_keys = {item.get('trackingID', None) for item in existing_data}

json_file_path = 'json/dark-pool/feed/data.json'
existing_data = load_json(json_file_path)
# Transform existing data into a set of unique trackingIDs
existing_keys = {item.get('trackingID',None) for item in existing_data}
# Fetch new data from the API
data = get_data()

# Prepare results with only new data
res = []
for item in data:
symbol = item['ticker']
if symbol.lower() == 'brk.b':
item['ticker'] = 'BRK-B'
symbol = item['ticker']
if symbol.lower() == 'brk.a':
item['ticker'] = 'BRK-A'
symbol = item['ticker']
if symbol in total_symbols:
quote_data = get_quote_data(symbol)
if symbol in stock_symbols:
asset_type = 'Stock'
else:
asset_type = 'ETF'

try:
# Check if the data is already in the file
if item['tracking_id'] not in existing_keys:
try:
sector = stock_screener_data_dict[symbol].get('sector', None)
except:
sector = ""

volume = float(item['volume'])
size = float(item['size'])

size_volume_ratio = round((size / volume) * 100, 2)
size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2)
res.append({
'ticker': item['ticker'],
'date': item['executed_at'],
'price': round(float(item['price']),2),
'size': item['size'],
'volume': volume,
'premium': item['premium'],
'sector': sector,
'assetType': asset_type,
'sizeVolRatio': size_volume_ratio,
'sizeAvgVolRatio': size_avg_volume_ratio,
'trackingID': item['tracking_id']
})
except Exception as e:
print(f"Error processing {symbol}: {e}")

# Append new data to existing data and combine
try:
if item['tracking_id'] not in existing_keys:
sector = stock_screener_data_dict.get(symbol, {}).get('sector', "")
volume = float(item['volume'])
size = float(item['size'])
quote_data = get_quote_data(symbol) or {}
size_volume_ratio = round((size / volume) * 100, 2)
size_avg_volume_ratio = round((size / quote_data.get('avgVolume', 1)) * 100, 2)
res.append({
'ticker': item['ticker'],
'date': item['executed_at'],
'price': round(float(item['price']), 2),
'size': item['size'],
'volume': volume,
'premium': item['premium'],
'sector': sector,
'assetType': 'Stock' if symbol in stock_screener_data_dict else 'ETF',
'sizeVolRatio': size_volume_ratio,
'sizeAvgVolRatio': size_avg_volume_ratio,
'trackingID': item['tracking_id']
})
except Exception as e:
print(f"Error processing {symbol}: {e}")

# Combine new data with existing data
combined_data = existing_data + res
# Save the updated data
save_latest_ratings(combined_data, json_file_path)

# Save the combined data to a daily file
save_to_daily_file(combined_data, historical_directory)

if __name__ == '__main__':
main()
10 changes: 5 additions & 5 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from slowapi.errors import RateLimitExceeded
from functools import partial
from datetime import datetime
from utils.helper import load_latest_json

# DB constants & context manager

Expand Down Expand Up @@ -2940,13 +2941,12 @@ async def get_options_flow_feed(api_key: str = Security(get_api_key)):
headers={"Content-Encoding": "gzip"}
)


@app.get("/dark-pool-flow-feed")
async def get_dark_pool_feed(api_key: str = Security(get_api_key)):
try:
with open(f"json/dark-pool/feed/data.json", 'rb') as file:
res_list = orjson.loads(file.read())
except:
res_list = []
directory = "json/dark-pool/historical-flow"
res_list = load_latest_json(directory)

data = orjson.dumps(res_list)
compressed_data = gzip.compress(data)
return StreamingResponse(
Expand Down
Binary file modified app/utils/__pycache__/helper.cpython-310.pyc
Binary file not shown.
29 changes: 29 additions & 0 deletions app/utils/helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from datetime import datetime, timedelta, time
import os
import orjson
import pytz

def check_market_hours():
Expand Down Expand Up @@ -28,3 +30,30 @@ def check_market_hours():
return True #"Market hours."
else:
return False #"Market is closed."


def load_latest_json(directory: str):
"""Load the latest JSON file from a directory based on the filename (assumed to be a date)."""
try:
latest_file = None
latest_date = None

# Iterate over files in the directory
for filename in os.listdir(directory):
if filename.endswith('.json'):
# Extract date from filename (assumed format 'YYYY-MM-DD.json')
file_date = filename.split('.')[0]

if latest_date is None or file_date > latest_date:
latest_date = file_date
latest_file = filename

if not latest_file:
return [] # No files found

latest_file_path = os.path.join(directory, latest_file)
with open(latest_file_path, 'rb') as file:
return orjson.loads(file.read())
except Exception as e:
print(f"Error loading latest JSON file: {e}")
return []

0 comments on commit dc9be95

Please sign in to comment.