diff --git a/app/cron_shareholders.py b/app/cron_shareholders.py new file mode 100644 index 0000000..598d77e --- /dev/null +++ b/app/cron_shareholders.py @@ -0,0 +1,84 @@ +import aiohttp +import ujson +import sqlite3 +import asyncio +import pandas as pd +from tqdm import tqdm +from collections import defaultdict + + +query_template = """ + SELECT + analyst_estimates, income + FROM + stocks + WHERE + symbol = ? +""" + +async def save_as_json(symbol, data): + with open(f"json/shareholders/{symbol}.json", 'w') as file: + ujson.dump(data, file) + + +async def get_data(ticker, stock_symbols, etf_symbols, con, etf_con): + if ticker in etf_symbols: + table_name = 'etfs' + else: + table_name = 'stocks' + + query_template = f""" + SELECT + shareholders + FROM + {table_name} + WHERE + symbol = ? + """ + try: + df = pd.read_sql_query(query_template, etf_con if table_name == 'etfs' else con, params=(ticker,)) + shareholders_list = ujson.loads(df.to_dict()['shareholders'][0]) + # Keys to keep + keys_to_keep = ["cik","ownership", "investorName", "weight", "sharesNumber", "marketValue"] + + # Create new list with only the specified keys + shareholders_list = [ + {key: d[key] for key in keys_to_keep} + for d in shareholders_list + ] + except Exception as e: + #print(e) + shareholders_list = [] + + return shareholders_list + + +async def run(): + + con = sqlite3.connect('stocks.db') + etf_con = sqlite3.connect('etf.db') + + cursor = con.cursor() + cursor.execute("PRAGMA journal_mode = wal") + cursor.execute("SELECT DISTINCT symbol FROM stocks") + stock_symbols = [row[0] for row in cursor.fetchall()] + + etf_cursor = etf_con.cursor() + etf_cursor.execute("PRAGMA journal_mode = wal") + etf_cursor.execute("SELECT DISTINCT symbol FROM etfs") + etf_symbols = [row[0] for row in etf_cursor.fetchall()] + + total_symbols = stock_symbols + etf_symbols + + for ticker in tqdm(total_symbols): + shareholders_list = await get_data(ticker, stock_symbols, etf_symbols, con, etf_con) + if len(shareholders_list) > 0: + await save_as_json(ticker, shareholders_list) + + con.close() + etf_con.close() + +try: + asyncio.run(run()) +except Exception as e: + print(e) diff --git a/app/main.py b/app/main.py index 8f1d96f..73a9115 100755 --- a/app/main.py +++ b/app/main.py @@ -1345,52 +1345,20 @@ async def get_fair_price(data: TickerData): data = data.dict() ticker = data['ticker'].upper() - - cache_key = f"get-shareholders-{ticker}" + cache_key = f"shareholders-{ticker}" cached_result = redis_client.get(cache_key) if cached_result: - return StreamingResponse( - io.BytesIO(cached_result), - media_type="application/json", - headers={"Content-Encoding": "gzip"} - ) - - if ticker in etf_symbols: - table_name = 'etfs' - else: - table_name = 'stocks' - - query_template = f""" - SELECT - shareholders - FROM - {table_name} - WHERE - symbol = ? - """ + return ujson.loads(cached_result) - df = pd.read_sql_query(query_template, etf_con if table_name == 'etfs' else con, params=(ticker,)) - #con.close() try: - shareholders_list = ujson.loads(df.to_dict()['shareholders'][0])[0:10] + with open(f"json/shareholders/{ticker}.json", 'r') as file: + res = ujson.load(file) except: - shareholders_list = [] - - - res_json = ujson.dumps(shareholders_list).encode('utf-8') - compressed_data = gzip.compress(res_json) - redis_client.set(cache_key, compressed_data) - redis_client.expire(cache_key, 3600 * 3600) # Set cache expiration time to Infinity - - return StreamingResponse( - io.BytesIO(compressed_data), - media_type="application/json", - headers={"Content-Encoding": "gzip"} - ) - - - return shareholders_list + res = [] + redis_client.set(cache_key, ujson.dumps(res)) + redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day + return res @app.post("/cik-data") @@ -2600,7 +2568,7 @@ async def get_wiim(data:TickerData): try: with open(f"json/wiim/company/{ticker}.json", 'r') as file: - res = ujson.load(file) + res = ujson.load(file)[:10] except: res = [] diff --git a/app/ml_models/fundamental_predictor.py b/app/ml_models/fundamental_predictor.py index 35eb392..39530e3 100755 --- a/app/ml_models/fundamental_predictor.py +++ b/app/ml_models/fundamental_predictor.py @@ -135,6 +135,8 @@ def build_model(self): model.add(Conv1D(filters=64, kernel_size=3, padding='same', activation='relu', input_shape=(None, 1))) + model.add(Conv1D(filters=32, kernel_size=3, padding='same', activation='relu', input_shape=(None, 1))) + # First LSTM layer with dropout and batch normalization model.add(LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.01))) model.add(Dropout(0.5)) @@ -276,7 +278,7 @@ async def test_process(con): start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d") end_date = datetime.today().strftime("%Y-%m-%d") predictor = FundamentalPredictor() - df = await download_data('RDDT', con, start_date, end_date) + df = await download_data('GME', con, start_date, end_date) split_size = int(len(df) * (1-test_size)) test_data = df.iloc[split_size:] #selected_features = [col for col in test_data if col not in ['price','date','Target']] @@ -287,7 +289,7 @@ async def main(): con = sqlite3.connect('../stocks.db') cursor = con.cursor() cursor.execute("PRAGMA journal_mode = wal") - cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 100E9") + cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E9") stock_symbols = [row[0] for row in cursor.fetchall()] print('Number of Stocks') print(len(stock_symbols)) diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 9326e0b..75d7d52 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -77,6 +77,17 @@ def run_analyst_estimate(): ] subprocess.run(command) +def run_shareholders(): + week = datetime.today().weekday() + if week <= 5: + subprocess.run(["python3", "cron_shareholders.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/shareholders", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + subprocess.run(command) + def run_cron_market_movers(): week = datetime.today().weekday() if week <= 4: @@ -259,6 +270,7 @@ def run_threaded(job_func): schedule.every().day.at("07:00").do(run_threaded, run_ta_rating).tag('ta_rating_job') schedule.every().day.at("08:00").do(run_threaded, run_cron_insider_trading).tag('insider_trading_job') schedule.every().day.at("09:00").do(run_threaded, run_congress_trading).tag('congress_job') +schedule.every().day.at("10:00").do(run_threaded, run_shareholders).tag('shareholders_job') schedule.every().day.at("13:30").do(run_threaded, run_stockdeck).tag('stockdeck_job') schedule.every().day.at("13:40").do(run_threaded, run_analyst_estimate).tag('analyst_estimate_job')