Skip to content

Commit

Permalink
add shareholders cron job
Browse files Browse the repository at this point in the history
  • Loading branch information
MuslemRahimi committed Jun 6, 2024
1 parent 93e14d5 commit 44a64c3
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 43 deletions.
84 changes: 84 additions & 0 deletions app/cron_shareholders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import aiohttp
import ujson
import sqlite3
import asyncio
import pandas as pd
from tqdm import tqdm
from collections import defaultdict


query_template = """
SELECT
analyst_estimates, income
FROM
stocks
WHERE
symbol = ?
"""

async def save_as_json(symbol, data):
with open(f"json/shareholders/{symbol}.json", 'w') as file:
ujson.dump(data, file)


async def get_data(ticker, stock_symbols, etf_symbols, con, etf_con):
if ticker in etf_symbols:
table_name = 'etfs'
else:
table_name = 'stocks'

query_template = f"""
SELECT
shareholders
FROM
{table_name}
WHERE
symbol = ?
"""
try:
df = pd.read_sql_query(query_template, etf_con if table_name == 'etfs' else con, params=(ticker,))
shareholders_list = ujson.loads(df.to_dict()['shareholders'][0])
# Keys to keep
keys_to_keep = ["cik","ownership", "investorName", "weight", "sharesNumber", "marketValue"]

# Create new list with only the specified keys
shareholders_list = [
{key: d[key] for key in keys_to_keep}
for d in shareholders_list
]
except Exception as e:
#print(e)
shareholders_list = []

return shareholders_list


async def run():

con = sqlite3.connect('stocks.db')
etf_con = sqlite3.connect('etf.db')

cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks")
stock_symbols = [row[0] for row in cursor.fetchall()]

etf_cursor = etf_con.cursor()
etf_cursor.execute("PRAGMA journal_mode = wal")
etf_cursor.execute("SELECT DISTINCT symbol FROM etfs")
etf_symbols = [row[0] for row in etf_cursor.fetchall()]

total_symbols = stock_symbols + etf_symbols

for ticker in tqdm(total_symbols):
shareholders_list = await get_data(ticker, stock_symbols, etf_symbols, con, etf_con)
if len(shareholders_list) > 0:
await save_as_json(ticker, shareholders_list)

con.close()
etf_con.close()

try:
asyncio.run(run())
except Exception as e:
print(e)
50 changes: 9 additions & 41 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1345,52 +1345,20 @@ async def get_fair_price(data: TickerData):
data = data.dict()
ticker = data['ticker'].upper()


cache_key = f"get-shareholders-{ticker}"
cache_key = f"shareholders-{ticker}"
cached_result = redis_client.get(cache_key)
if cached_result:
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)

if ticker in etf_symbols:
table_name = 'etfs'
else:
table_name = 'stocks'

query_template = f"""
SELECT
shareholders
FROM
{table_name}
WHERE
symbol = ?
"""
return ujson.loads(cached_result)

df = pd.read_sql_query(query_template, etf_con if table_name == 'etfs' else con, params=(ticker,))
#con.close()
try:
shareholders_list = ujson.loads(df.to_dict()['shareholders'][0])[0:10]
with open(f"json/shareholders/{ticker}.json", 'r') as file:
res = ujson.load(file)
except:
shareholders_list = []


res_json = ujson.dumps(shareholders_list).encode('utf-8')
compressed_data = gzip.compress(res_json)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 3600 * 3600) # Set cache expiration time to Infinity

return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)


return shareholders_list
res = []

redis_client.set(cache_key, ujson.dumps(res))
redis_client.expire(cache_key, 3600 * 24) # Set cache expiration time to 1 day
return res


@app.post("/cik-data")
Expand Down Expand Up @@ -2600,7 +2568,7 @@ async def get_wiim(data:TickerData):

try:
with open(f"json/wiim/company/{ticker}.json", 'r') as file:
res = ujson.load(file)
res = ujson.load(file)[:10]
except:
res = []

Expand Down
6 changes: 4 additions & 2 deletions app/ml_models/fundamental_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def build_model(self):

model.add(Conv1D(filters=64, kernel_size=3, padding='same', activation='relu', input_shape=(None, 1)))

model.add(Conv1D(filters=32, kernel_size=3, padding='same', activation='relu', input_shape=(None, 1)))

# First LSTM layer with dropout and batch normalization
model.add(LSTM(256, return_sequences=True, kernel_regularizer=regularizers.l2(0.01)))
model.add(Dropout(0.5))
Expand Down Expand Up @@ -276,7 +278,7 @@ async def test_process(con):
start_date = datetime(2000, 1, 1).strftime("%Y-%m-%d")
end_date = datetime.today().strftime("%Y-%m-%d")
predictor = FundamentalPredictor()
df = await download_data('RDDT', con, start_date, end_date)
df = await download_data('GME', con, start_date, end_date)
split_size = int(len(df) * (1-test_size))
test_data = df.iloc[split_size:]
#selected_features = [col for col in test_data if col not in ['price','date','Target']]
Expand All @@ -287,7 +289,7 @@ async def main():
con = sqlite3.connect('../stocks.db')
cursor = con.cursor()
cursor.execute("PRAGMA journal_mode = wal")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 100E9")
cursor.execute("SELECT DISTINCT symbol FROM stocks WHERE marketCap >= 500E9")
stock_symbols = [row[0] for row in cursor.fetchall()]
print('Number of Stocks')
print(len(stock_symbols))
Expand Down
12 changes: 12 additions & 0 deletions app/primary_cron_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ def run_analyst_estimate():
]
subprocess.run(command)

def run_shareholders():
week = datetime.today().weekday()
if week <= 5:
subprocess.run(["python3", "cron_shareholders.py"])
command = [
"sudo", "rsync", "-avz", "-e", "ssh",
"/root/backend/app/json/shareholders",
f"root@{useast_ip_address}:/root/backend/app/json"
]
subprocess.run(command)

def run_cron_market_movers():
week = datetime.today().weekday()
if week <= 4:
Expand Down Expand Up @@ -259,6 +270,7 @@ def run_threaded(job_func):
schedule.every().day.at("07:00").do(run_threaded, run_ta_rating).tag('ta_rating_job')
schedule.every().day.at("08:00").do(run_threaded, run_cron_insider_trading).tag('insider_trading_job')
schedule.every().day.at("09:00").do(run_threaded, run_congress_trading).tag('congress_job')
schedule.every().day.at("10:00").do(run_threaded, run_shareholders).tag('shareholders_job')

schedule.every().day.at("13:30").do(run_threaded, run_stockdeck).tag('stockdeck_job')
schedule.every().day.at("13:40").do(run_threaded, run_analyst_estimate).tag('analyst_estimate_job')
Expand Down

0 comments on commit 44a64c3

Please sign in to comment.