Skip to content

Commit

Permalink
add dark pool flow
Browse files Browse the repository at this point in the history
  • Loading branch information
MuslemRahimi committed Jul 10, 2024
1 parent 6c6b8c6 commit 3be300c
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 4 deletions.
93 changes: 93 additions & 0 deletions app/cron_dark_pool_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import time
from datetime import datetime
from GetStartEndDate import GetStartEndDate

import intrinio_sdk as intrinio
import ujson
import sqlite3
import pytz

from dotenv import load_dotenv
import os

ny_tz = pytz.timezone('America/New_York')


load_dotenv()
api_key = os.getenv('INTRINIO_API_KEY')

intrinio.ApiClient().set_api_key(api_key)
intrinio.ApiClient().allow_retries(True)

def save_json(data):
with open(f"json/dark-pool/flow/data.json", 'w') as file:
ujson.dump(data, file)


source = 'cta_a_delayed'
start_date, end_date = GetStartEndDate().run()
start_time = ''
end_time = ''
timezone = 'UTC'
page_size = 1000
darkpool_only = True
min_size = 100
count = 0


def get_data():
data = []
count = 0
while True:
if count == 0:
next_page = ''
try:
response = intrinio.SecurityApi().get_security_trades(source, start_date=start_date, start_time=start_time, end_date=end_date, end_time=end_time, timezone=timezone, page_size=page_size, darkpool_only=darkpool_only, min_size=min_size, next_page=next_page)
data += response.trades

next_page = response.next_page
if not next_page or count == 10:
break
count +=1
except:
pass

return data

def run():
con = sqlite3.connect('stocks.db')
cursor = con.cursor()
cursor.execute("SELECT DISTINCT symbol, name FROM stocks")
stocks = cursor.fetchall()
con.close()

symbol_name_map = {row[0]: row[1] for row in stocks}
stock_symbols = list(symbol_name_map.keys())
data = get_data()

print(len(data))
# Convert each SecurityTrades object to a dictionary
data_dicts = [entry.__dict__ for entry in data]
# Filter the data
filtered_data = [entry for entry in data_dicts if entry['_symbol'] in stock_symbols]
res = [
{
'symbol': entry['_symbol'],
'name': symbol_name_map[entry['_symbol']],
'date': entry['_timestamp'].astimezone(ny_tz).isoformat(),
'price': entry['_price'],
'volume': entry['_total_volume'],
'size': entry['_size']
}
for entry in filtered_data
]

if len(res) > 0:
save_json(res)


if __name__ == "__main__":
try:
run()
except Exception as e:
print(f"An error occurred: {e}")
31 changes: 30 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2611,7 +2611,7 @@ async def get_wiim(data:TickerData):

try:
with open(f"json/wiim/company/{ticker}.json", 'r') as file:
res = ujson.load(file)[:10]
res = ujson.load(file)[:5]
except:
res = []

Expand Down Expand Up @@ -2921,6 +2921,35 @@ async def get_dark_pool(data:TickerData):
redis_client.expire(cache_key, 3600*3600) # Set cache expiration time to 1 day
return res


@app.get("/dark-pool-flow")
async def get_dark_pool_flow():
cache_key = f"dark-flow-flow"

cached_result = redis_client.get(cache_key)
if cached_result:
return StreamingResponse(
io.BytesIO(cached_result),
media_type="application/json",
headers={"Content-Encoding": "gzip"})
try:
with open(f"json/dark-pool/flow/data.json", 'r') as file:
res = ujson.load(file)
except:
res = []

data = ujson.dumps(res).encode('utf-8')
compressed_data = gzip.compress(data)
redis_client.set(cache_key, compressed_data)
redis_client.expire(cache_key, 60*15) # Set cache expiration time to 15 min

return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)


@app.post("/market-maker")
async def get_market_maker(data:TickerData):
ticker = data.ticker.upper()
Expand Down
21 changes: 18 additions & 3 deletions app/primary_cron_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,19 @@ def run_dark_pool():
]
run_command(command)


def run_dark_pool_flow():
week = datetime.today().weekday()
if week <= 5:
run_command(["python3", "cron_dark_pool_flow.py"])
command = [
"sudo", "rsync", "-avz", "-e", "ssh",
"/root/backend/app/json/dark-pool/flow",
f"root@{useast_ip_address}:/root/backend/app/json/dark-pool"
]
run_command(command)


def run_market_maker():
week = datetime.today().weekday()
if week <= 5:
Expand Down Expand Up @@ -436,9 +449,9 @@ def run_threaded(job_func):
schedule.every().day.at("10:15").do(run_threaded, run_share_statistics).tag('share_statistics_job')
schedule.every().day.at("10:30").do(run_threaded, run_sec_filings).tag('sec_filings_job')
schedule.every().day.at("11:00").do(run_threaded, run_executive).tag('executive_job')
schedule.every().day.at("11:30").do(run_threaded, run_retail_volume).tag('retail_volume_job')
schedule.every().day.at("03:00").do(run_threaded, run_retail_volume).tag('retail_volume_job')
schedule.every().day.at("11:45").do(run_threaded, run_clinical_trial).tag('clinical_trial_job')
schedule.every().day.at("12:00").do(run_threaded, run_implied_volatility).tag('implied_volatility_job')
schedule.every().day.at("02:00").do(run_threaded, run_implied_volatility).tag('implied_volatility_job')


schedule.every().day.at("13:30").do(run_threaded, run_stockdeck).tag('stockdeck_job')
Expand All @@ -450,7 +463,7 @@ def run_threaded(job_func):
schedule.every().day.at("15:45").do(run_threaded, run_restart_cache)
schedule.every(2).days.at("01:00").do(run_borrowed_share).tag('borrowed_share_job')

schedule.every().saturday.at("01:00").do(run_threaded, run_market_maker).tag('markt_maker_job')
schedule.every(2).days.at("01:00").do(run_threaded, run_market_maker).tag('markt_maker_job')
schedule.every().saturday.at("05:00").do(run_threaded, run_ownership_stats).tag('ownership_stats_job')


Expand All @@ -464,6 +477,8 @@ def run_threaded(job_func):
schedule.every(1).minutes.do(run_threaded, run_cron_quote).tag('quote_job')
schedule.every(1).minutes.do(run_threaded, run_cron_price_alert).tag('price_alert_job')
schedule.every(15).minutes.do(run_threaded, run_market_moods).tag('market_moods_job')
schedule.every(20).minutes.do(run_threaded, run_dark_pool_flow).tag('dark_pool_flow_job')

schedule.every(2).hours.do(run_threaded, run_fda_calendar).tag('fda_calendar_job')
schedule.every(3).hours.do(run_threaded, run_json_job).tag('json_job')
schedule.every(12).hours.do(run_threaded, run_analyst_rating).tag('analyst_job')
Expand Down
Binary file modified app/quant-analysis/daily_return.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified app/quant-analysis/histogram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified app/quant-analysis/simulation.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 3be300c

Please sign in to comment.