Skip to content

Commit

Permalink
update price alert endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
MuslemRahimi committed Dec 5, 2024
1 parent 4c133dc commit e2088f9
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 33 deletions.
2 changes: 1 addition & 1 deletion app/cron_options_bubble.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def options_bubble_data(chunk):
df['open_interest'] = pd.to_numeric(df['open_interest'], errors='coerce')

df['days_to_expiration'] = (df['date_expiration'] - df['date']).dt.days
df_30d = df[(df['days_to_expiration'] >= 40) & (df['days_to_expiration'] <= 80)]
df_30d = df[(df['days_to_expiration'] >= 0) & (df['days_to_expiration'] <= 1000)]
# Calculate implied volatility for options in the 30-day range
iv_data = []
for _, option in df_30d.iterrows():
Expand Down
137 changes: 105 additions & 32 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from functools import partial
from datetime import datetime

# DB constants & context manager

Expand Down Expand Up @@ -1256,8 +1257,6 @@ async def get_indicator(data: IndicatorListData, api_key: str = Security(get_api



from datetime import datetime

async def process_watchlist_ticker(ticker, rule_of_list, quote_keys_to_include, screener_dict, etf_symbols, crypto_symbols):
"""Process a single ticker concurrently."""
ticker = ticker.upper()
Expand Down Expand Up @@ -1382,48 +1381,103 @@ async def get_watchlist(data: GetWatchList, api_key: str = Security(get_api_key)


@app.post("/get-price-alert")
async def get_price_alert(data: UserId, api_key: str = Security(get_api_key)):
user_id = data.dict()['userId']
async def get_price_alert(data: dict, api_key: str = Security(get_api_key)):
user_id = data.get('userId')
if not user_id:
raise HTTPException(status_code=400, detail="User ID is required")

# Fetch all alerts for the user in a single database call
result = pb.collection("priceAlert").get_full_list(query_params={"filter": f"user='{user_id}' && triggered=false"})
try:
result = pb.collection("priceAlert").get_full_list(
query_params={"filter": f"user='{user_id}' && triggered=false"}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Database query failed: {str(e)}")

# Extract unique tickers
unique_tickers = {item.symbol for item in result if hasattr(item, 'symbol')}

async def fetch_ticker_data(ticker):
try:
news_task = load_json_async(f"json/market-news/companies/{ticker}.json")
earnings_task = load_json_async(f"json/earnings/next/{ticker}.json")

news_dict, earnings_dict = await asyncio.gather(news_task, earnings_task)

# Process news
news = []
if news_dict:
news = [
{key: value for key, value in item.items() if key not in ['image', 'text']}
for item in news_dict[:5]
]

# Process earnings
earnings = None
if earnings_dict:
earnings = {**earnings_dict, 'symbol': ticker}

return news, earnings
except Exception as e:
print(f"Error fetching data for {ticker}: {e}")
return [], None

# Function to read JSON file asynchronously
async def fetch_quote_data(item):
try:
async with aiofiles.open(f"json/quote/{item.symbol}.json", mode='r') as file:
quote_data = orjson.loads(await file.read())
return {
'symbol': item.symbol,
'name': item.name,
'id': item.id,
'assetType': item.asset_type,
'targetPrice': item.target_price,
'priceWhenCreated': item.price_when_created,
'price': quote_data.get("price"),
'changesPercentage': quote_data.get("changesPercentage"),
'volume': quote_data.get("volume"),
}

return {
'symbol': item.symbol,
'name': getattr(item, 'name', ''),
'id': item.id,
'assetType': getattr(item, 'asset_type', ''),
'targetPrice': getattr(item, 'target_price', None),
'condition': getattr(item, 'condition', '').capitalize(),
'priceWhenCreated': getattr(item, 'price_when_created', None),
'price': quote_data.get("price"),
'changesPercentage': quote_data.get("changesPercentage"),
'volume': quote_data.get("volume"),
}
except FileNotFoundError:
print(f"Quote file not found for {item.symbol}")
return None
except Exception as e:
print(f"Error processing {item.symbol}: {e}")
return None

# Run all fetch_quote_data tasks concurrently
tasks = [fetch_quote_data(item) for item in result]
res_list = [res for res in await asyncio.gather(*tasks) if res]

# Serialize and compress the response data
res = orjson.dumps(res_list)
compressed_data = gzip.compress(res)

return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)

try:
# Run all tasks concurrently
ticker_tasks = [fetch_ticker_data(ticker) for ticker in unique_tickers]
quote_tasks = [fetch_quote_data(item) for item in result]

ticker_results = await asyncio.gather(*ticker_tasks)
quote_results = await asyncio.gather(*quote_tasks)

# Process results
combined_results = [res for res in quote_results if res]
combined_news = [news_item for news, _ in ticker_results for news_item in news]
combined_earnings = [earnings for _, earnings in ticker_results if earnings]

# Final response structure
res = {
'data': combined_results,
'news': combined_news,
'earnings': combined_earnings,
}

# Serialize and compress the response data
res_serialized = orjson.dumps(res)
compressed_data = gzip.compress(res_serialized)
print(combined_earnings)
return StreamingResponse(
io.BytesIO(compressed_data),
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)


except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

def process_option_activity(item):
item['put_call'] = 'Calls' if item['put_call'] == 'CALL' else 'Puts'
Expand Down Expand Up @@ -2504,6 +2558,25 @@ async def get_pre_post_quote(data:TickerData, api_key: str = Security(get_api_ke
redis_client.expire(cache_key, 60) # Set cache expiration time to 1 day
return res

@app.post("/get-quote")
async def get_pre_post_quote(data:TickerData, api_key: str = Security(get_api_key)):
ticker = data.ticker.upper()

cache_key = f"get-quote-{ticker}"
cached_result = redis_client.get(cache_key)
if cached_result:
return orjson.loads(cached_result)

try:
with open(f"json/quote/{ticker}.json", 'rb') as file:
res = orjson.loads(file.read())
except:
res = {}

redis_client.set(cache_key, orjson.dumps(res))
redis_client.expire(cache_key, 60) # Set cache expiration time to 1 day
return res

@app.post("/bull-bear-say")
async def get_bull_bear_say(data:TickerData, api_key: str = Security(get_api_key)):
ticker = data.ticker.upper()
Expand Down

0 comments on commit e2088f9

Please sign in to comment.