Skip to content

Commit

Permalink
Merge pull request #30 from golemfactory/ssr-fixed
Browse files Browse the repository at this point in the history
fixed ssr
  • Loading branch information
cryptobench authored Oct 14, 2024
2 parents 82aba96 + 1ac30f8 commit f7a331a
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 83 deletions.
91 changes: 91 additions & 0 deletions stats-backend/api2/management/commands/relay_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# yourapp/management/commands/relay_monitor.py

import asyncio
import aiohttp
import requests
from django.core.management.base import BaseCommand
from django.db.models import Q
from api2.models import Node
from api2.tasks import bulk_update_node_statuses

class Command(BaseCommand):
help = 'Monitors relay nodes and listens for events'

def handle(self, *args, **options):
self.stdout.write('Starting relay monitor...')
asyncio.run(self.main())

async def main(self):
await self.initial_relay_nodes_scan()
await self.listen_for_relay_events()

async def initial_relay_nodes_scan(self):
base_url = "http://yacn2.dev.golem.network:9000/nodes/"
nodes_to_update = {}

for prefix in range(256):
try:
response = requests.get(f"{base_url}{prefix:02x}", timeout=5)
response.raise_for_status()
data = response.json()

for node_id, sessions in data.items():
node_id = node_id.strip().lower()
is_online = bool(sessions) and any('seen' in item for item in sessions if item)
nodes_to_update[node_id] = is_online

except requests.RequestException as e:
print(f"Error fetching data for prefix {prefix:02x}: {e}")

# Query the database for all online providers
online_providers = set(Node.objects.filter(online=True).values_list('node_id', flat=True))

# Check for providers that are marked as online in the database but not in the relay data
for provider_id in online_providers:
if provider_id not in nodes_to_update:
nodes_to_update[provider_id] = False

# Convert the dictionary to a list of tuples
nodes_to_update_list = list(nodes_to_update.items())

bulk_update_node_statuses.delay(nodes_to_update_list)

async def listen_for_relay_events(self):
self.stdout.write('Listening for relay events...')
url = "http://yacn2.dev.golem.network:9000/events"
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get(url) as resp:
async for line in resp.content:
if line:
try:
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('event:'):
event_type = decoded_line.split(':', 1)[1].strip()
elif decoded_line.startswith('data:'):
node_id = decoded_line.split(':', 1)[1].strip()
event = {'Type': event_type, 'Id': node_id}
await self.process_event(event)
except Exception as e:
self.stdout.write(self.style.ERROR(f"Failed to process event: {e}"))
except Exception as e:
self.stdout.write(self.style.ERROR(f"Connection error: {e}"))
await asyncio.sleep(5) # Wait before reconnecting

async def process_event(self, event):
event_type = event.get('Type')
node_id = event.get('Id')

if event_type == 'new-node':
self.stdout.write(f"New node: {node_id}")
bulk_update_node_statuses.delay([(node_id, True)])
elif event_type == 'lost-node':
self.stdout.write(f"Lost node: {node_id}")
bulk_update_node_statuses.delay([(node_id, False)])

async def fetch(self, url):
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=5) as response:
response.raise_for_status()
return await response.json()
94 changes: 19 additions & 75 deletions stats-backend/api2/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1826,86 +1826,30 @@ def extract_wallets_and_ids():
from django.db.models import Case, When, Value, BooleanField
from .models import NodeStatusHistory, Node


@app.task
def bulk_update_node_statuses(nodes_data):
status_history_to_create = []
redis_updates = {}
offline_nodes = []
online_nodes = []
nodes_to_create = []
for node_id, is_online in nodes_data:
latest_status = r.get(f"provider:{node_id}:status")

if latest_status is None or latest_status.decode() != str(is_online):
status_history_to_create.append(
NodeStatusHistory(node_id=node_id, is_online=is_online)
)
redis_updates[f"provider:{node_id}:status"] = str(is_online)

if is_online:
online_nodes.append(node_id)
else:
offline_nodes.append(node_id)

# Check if the node exists, if not, prepare to create it
if not Node.objects.filter(node_id=node_id).exists():
nodes_to_create.append(Node(node_id=node_id, online=is_online))

if status_history_to_create or nodes_to_create:
with transaction.atomic():
NodeStatusHistory.objects.bulk_create(status_history_to_create)

# Create new nodes if any
Node.objects.bulk_create(nodes_to_create, ignore_conflicts=True)

# Efficiently update Node objects for offline and online nodes
Node.objects.filter(node_id__in=offline_nodes).update(online=False)
Node.objects.filter(node_id__in=online_nodes).update(online=True)
if redis_updates:
r.mset(redis_updates)



from .utils import check_node_status
@app.task
def fetch_and_update_relay_nodes_online_status():
base_url = "http://yacn2.dev.golem.network:9000/nodes/"
current_online_nodes = set()
nodes_to_update = []

for prefix in range(256):
try:
response = requests.get(f"{base_url}{prefix:02x}", timeout=5)
response.raise_for_status()
data = response.json()

for node_id, sessions in data.items():
node_id = node_id.strip().lower()
is_online = bool(sessions) and any('seen' in item for item in sessions if item)
current_online_nodes.add(node_id)
nodes_to_update.append((node_id, is_online))

except requests.RequestException as e:
print(f"Error fetching data for prefix {prefix:02x}: {e}")

# Bulk update node statuses
bulk_update_node_statuses.delay(nodes_to_update)

# Check providers that were previously online but not found in the current scan
previously_online = set(NodeStatusHistory.objects.filter(
is_online=True
).order_by('node_id', '-timestamp').distinct('node_id').values_list('node_id', flat=True))
with transaction.atomic():
for node_id, is_online in nodes_data:
try:
node = Node.objects.select_for_update().get(node_id=node_id)
if node.online != is_online:
node.online = is_online
nodes_to_update.append(node)
except ObjectDoesNotExist:
node = Node(node_id=node_id, online=is_online, type="requestor")
nodes_to_update.append(node)

missing_nodes = previously_online - current_online_nodes
if missing_nodes:
check_missing_nodes.delay(list(missing_nodes))
status_history_to_create.append(
NodeStatusHistory(node_id=node_id, is_online=is_online)
)

# Bulk create new nodes and update existing ones
Node.objects.bulk_create([n for n in nodes_to_update if n._state.adding], ignore_conflicts=True)
Node.objects.bulk_update([n for n in nodes_to_update if not n._state.adding], ['online'])

@app.task
def check_missing_nodes(missing_nodes):
nodes_to_update = []
for node_id in missing_nodes:
is_online = check_node_status(node_id)
nodes_to_update.append((node_id, is_online))

bulk_update_node_statuses.delay(nodes_to_update)
# Bulk create status history
NodeStatusHistory.objects.bulk_create(status_history_to_create)
8 changes: 0 additions & 8 deletions stats-backend/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,9 @@ def setup_periodic_tasks(sender, **kwargs):
daily_volume_golem_vs_chain,
computing_total_over_time,
extract_wallets_and_ids,
fetch_and_update_relay_nodes_online_status,
)
v2_offer_scraper.apply_async(args=["ray-on-golem-heads"], queue="yagna", routing_key="yagna")
v2_offer_scraper.apply_async(queue="yagna", routing_key="yagna")

sender.add_periodic_task(
30,
fetch_and_update_relay_nodes_online_status.s(),
queue="default",
options={"queue": "default", "routing_key": "default"},
)
sender.add_periodic_task(
60,
computing_total_over_time.s(),
Expand Down

0 comments on commit f7a331a

Please sign in to comment.