From 3a62667e5e1c3f21574545198eee423006c07c35 Mon Sep 17 00:00:00 2001 From: MuslemRahimi Date: Sat, 20 Jul 2024 12:37:29 +0200 Subject: [PATCH] add dashboard cron job --- app/cron_dashboard.py | 92 +++++++++++++++++++++++++++++++++++++++++ app/cron_market_news.py | 10 ++++- app/cron_wiim.py | 2 +- app/main.py | 10 ++--- app/primary_cron_job.py | 10 +++++ 5 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 app/cron_dashboard.py diff --git a/app/cron_dashboard.py b/app/cron_dashboard.py new file mode 100644 index 0000000..e3e44b8 --- /dev/null +++ b/app/cron_dashboard.py @@ -0,0 +1,92 @@ +import aiohttp +import aiofiles +import ujson +import sqlite3 +import pandas as pd +import asyncio +import pytz +import time +import os +from dotenv import load_dotenv +from datetime import datetime, timedelta + +headers = {"accept": "application/json"} + + + +load_dotenv() +benzinga_api_key = os.getenv('BENZINGA_API_KEY') + + +async def save_json(data): + with open(f"json/dashboard/data.json", 'w') as file: + ujson.dump(data, file) + + +async def get_latest_bezinga_market_news(session): + url = "https://api.benzinga.com/api/v2/news" + querystring = {"token": benzinga_api_key,"channels":"News","pageSize":"10","displayOutput":"full"} + try: + async with session.get(url, params=querystring, headers=headers) as response: + res_list = [] + res = ujson.loads(await response.text()) + for item in res: + res_list.append({'date': item['created'], 'text': item['title'], 'url': item['url']}) + + res_list.sort(key=lambda x: datetime.strptime(x['date'], '%a, %d %b %Y %H:%M:%S %z'), reverse=True) + return res_list + except Exception as e: + #pass + print(e) + + +async def run(): + async with aiohttp.ClientSession() as session: + benzinga_news = await get_latest_bezinga_market_news(session) + try: + with open(f"json/congress-trading/rss-feed/data.json", 'r') as file: + congress_flow = ujson.load(file)[0:4] + except: + congress_flow = [] + try: + with open(f"json/options-flow/feed/data.json", 'r') as file: + options_flow = ujson.load(file) + options_flow = sorted(options_flow, key=lambda x: x['cost_basis'], reverse=True) + options_flow = [{key: item[key] for key in ['cost_basis', 'ticker','assetType', 'date_expiration', 'put_call', 'sentiment', 'strike_price']} for item in options_flow[0:4]] + except: + options_flow = [] + + try: + with open(f"json/wiim/rss-feed/data.json", 'r') as file: + wiim_feed = ujson.load(file) + + except: + wiim_feed = [] + + try: + with open(f"json/market-movers/data.json", 'r') as file: + data = ujson.load(file) + market_mover = {'winner': data['gainers']['1D'][0], 'loser': data['losers']['1D'][0], 'active': data['active']['1D'][0]} + except: + market_mover = {} + + try: + with open(f"json/most-shorted-stocks/data.json", 'r') as file: + data = ujson.load(file)[0] + shorted_stock = {key: data[key] for key in ['symbol', 'shortOutStandingPercent']} + + except: + shorted_stock = {} + + + quick_info = {**market_mover, 'shorted': shorted_stock} + + data = {'quickInfo': quick_info, 'optionsFlow': options_flow, 'congressFlow': congress_flow, 'wiimFeed': wiim_feed, 'marketNews': benzinga_news} + + if len(data) > 0: + await save_json(data) + +try: + asyncio.run(run()) +except Exception as e: + print(e) \ No newline at end of file diff --git a/app/cron_market_news.py b/app/cron_market_news.py index d39e155..b8aefba 100755 --- a/app/cron_market_news.py +++ b/app/cron_market_news.py @@ -10,6 +10,9 @@ finnhub_client = finnhub.Client(api_key=finnhub_api_key) +headers = {"accept": "application/json"} + + ''' async def run(): limit = 200 @@ -34,7 +37,9 @@ async def run(): with open(f"json/market-news/{data_name}.json", 'w') as file: ujson.dump(data, file) ''' -#Finnhub data + + + async def run(): limit = 200 urls = [ @@ -54,12 +59,13 @@ async def run(): with open(f"json/market-news/{data_name}.json", 'w') as file: ujson.dump(data, file) + + general_news = finnhub_client.general_news('general') general_news = [item for item in general_news if item["source"] != "" and item["image"] != ""] with open(f"json/market-news/general-news.json", 'w') as file: ujson.dump(general_news, file) - try: asyncio.run(run()) except Exception as e: diff --git a/app/cron_wiim.py b/app/cron_wiim.py index d8b2a5e..a1a9c76 100755 --- a/app/cron_wiim.py +++ b/app/cron_wiim.py @@ -107,7 +107,7 @@ async def get_endpoint(session, symbol, con): async def get_latest_wiim(session, stock_symbols, etf_symbols): url = "https://api.benzinga.com/api/v2/news" - querystring = {"token": api_key,"channels":"WIIM","pageSize":"20","displayOutput":"full"} + querystring = {"token": api_key,"channels":"WIIM","pageSize":"5","displayOutput":"full"} try: async with session.get(url, params=querystring, headers=headers) as response: diff --git a/app/main.py b/app/main.py index fff833e..76cf4b8 100755 --- a/app/main.py +++ b/app/main.py @@ -2687,21 +2687,21 @@ async def get_wiim(data:TickerData, api_key: str = Security(get_api_key)): redis_client.expire(cache_key, 60*60*2) return res -@app.get("/rss-feed-wiim") -async def get_rss_feed_wiim(api_key: str = Security(get_api_key)): +@app.get("/dashboard-info") +async def get_dashboard_info(api_key: str = Security(get_api_key)): - cache_key = f"rss_feed_wiim" + cache_key = f"dashboard-info" cached_result = redis_client.get(cache_key) if cached_result: return orjson.loads(cached_result) try: - with open(f"json/wiim/rss-feed/data.json", 'rb') as file: + with open(f"json/dashboard/data.json", 'rb') as file: res = orjson.loads(file.read()) except: res = [] redis_client.set(cache_key, orjson.dumps(res)) - redis_client.expire(cache_key, 60*5) # Set cache expiration time to 1 day + redis_client.expire(cache_key, 60*5) return res @app.post("/sentiment-analysis") diff --git a/app/primary_cron_job.py b/app/primary_cron_job.py index 99e9955..ecb885f 100755 --- a/app/primary_cron_job.py +++ b/app/primary_cron_job.py @@ -422,6 +422,15 @@ def run_government_contract(): ] run_command(command) +def run_dashboard(): + run_command(["python3", "cron_dashboard.py"]) + command = [ + "sudo", "rsync", "-avz", "-e", "ssh", + "/root/backend/app/json/dashboard", + f"root@{useast_ip_address}:/root/backend/app/json" + ] + run_command(command) + # Create functions to run each schedule in a separate thread def run_threaded(job_func): job_thread = threading.Thread(target=job_func) @@ -465,6 +474,7 @@ def run_threaded(job_func): schedule.every(1).minutes.do(run_threaded, run_cron_portfolio).tag('portfolio_job') schedule.every(5).minutes.do(run_threaded, run_cron_market_movers).tag('market_movers_job') +schedule.every(5).minutes.do(run_threaded, run_dashboard).tag('dashboard_job') schedule.every(15).minutes.do(run_threaded, run_cron_market_news).tag('market_news_job') schedule.every(10).minutes.do(run_threaded, run_one_day_price).tag('one_day_price_job')