diff --git a/stats-backend/api2/management/commands/relay_monitor.py b/stats-backend/api2/management/commands/relay_monitor.py new file mode 100644 index 0000000..c2e1ec6 --- /dev/null +++ b/stats-backend/api2/management/commands/relay_monitor.py @@ -0,0 +1,91 @@ +# yourapp/management/commands/relay_monitor.py + +import asyncio +import aiohttp +import requests +from django.core.management.base import BaseCommand +from django.db.models import Q +from api2.models import Node +from api2.tasks import bulk_update_node_statuses + +class Command(BaseCommand): + help = 'Monitors relay nodes and listens for events' + + def handle(self, *args, **options): + self.stdout.write('Starting relay monitor...') + asyncio.run(self.main()) + + async def main(self): + await self.initial_relay_nodes_scan() + await self.listen_for_relay_events() + + async def initial_relay_nodes_scan(self): + base_url = "http://yacn2.dev.golem.network:9000/nodes/" + nodes_to_update = {} + + for prefix in range(256): + try: + response = requests.get(f"{base_url}{prefix:02x}", timeout=5) + response.raise_for_status() + data = response.json() + + for node_id, sessions in data.items(): + node_id = node_id.strip().lower() + is_online = bool(sessions) and any('seen' in item for item in sessions if item) + nodes_to_update[node_id] = is_online + + except requests.RequestException as e: + print(f"Error fetching data for prefix {prefix:02x}: {e}") + + # Query the database for all online providers + online_providers = set(Node.objects.filter(online=True).values_list('node_id', flat=True)) + + # Check for providers that are marked as online in the database but not in the relay data + for provider_id in online_providers: + if provider_id not in nodes_to_update: + nodes_to_update[provider_id] = False + + # Convert the dictionary to a list of tuples + nodes_to_update_list = list(nodes_to_update.items()) + + bulk_update_node_statuses.delay(nodes_to_update_list) + + async def listen_for_relay_events(self): + self.stdout.write('Listening for relay events...') + url = "http://yacn2.dev.golem.network:9000/events" + async with aiohttp.ClientSession() as session: + while True: + try: + async with session.get(url) as resp: + async for line in resp.content: + if line: + try: + decoded_line = line.decode('utf-8').strip() + if decoded_line.startswith('event:'): + event_type = decoded_line.split(':', 1)[1].strip() + elif decoded_line.startswith('data:'): + node_id = decoded_line.split(':', 1)[1].strip() + event = {'Type': event_type, 'Id': node_id} + await self.process_event(event) + except Exception as e: + self.stdout.write(self.style.ERROR(f"Failed to process event: {e}")) + except Exception as e: + self.stdout.write(self.style.ERROR(f"Connection error: {e}")) + await asyncio.sleep(5) # Wait before reconnecting + + async def process_event(self, event): + event_type = event.get('Type') + node_id = event.get('Id') + + if event_type == 'new-node': + self.stdout.write(f"New node: {node_id}") + bulk_update_node_statuses.delay([(node_id, True)]) + elif event_type == 'lost-node': + self.stdout.write(f"Lost node: {node_id}") + bulk_update_node_statuses.delay([(node_id, False)]) + + async def fetch(self, url): + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=5) as response: + response.raise_for_status() + return await response.json() \ No newline at end of file diff --git a/stats-backend/api2/tasks.py b/stats-backend/api2/tasks.py index 9249ea2..c0cc5e2 100644 --- a/stats-backend/api2/tasks.py +++ b/stats-backend/api2/tasks.py @@ -1826,86 +1826,30 @@ def extract_wallets_and_ids(): from django.db.models import Case, When, Value, BooleanField from .models import NodeStatusHistory, Node + @app.task def bulk_update_node_statuses(nodes_data): status_history_to_create = [] - redis_updates = {} - offline_nodes = [] - online_nodes = [] - nodes_to_create = [] - for node_id, is_online in nodes_data: - latest_status = r.get(f"provider:{node_id}:status") - - if latest_status is None or latest_status.decode() != str(is_online): - status_history_to_create.append( - NodeStatusHistory(node_id=node_id, is_online=is_online) - ) - redis_updates[f"provider:{node_id}:status"] = str(is_online) - - if is_online: - online_nodes.append(node_id) - else: - offline_nodes.append(node_id) - - # Check if the node exists, if not, prepare to create it - if not Node.objects.filter(node_id=node_id).exists(): - nodes_to_create.append(Node(node_id=node_id, online=is_online)) - - if status_history_to_create or nodes_to_create: - with transaction.atomic(): - NodeStatusHistory.objects.bulk_create(status_history_to_create) - - # Create new nodes if any - Node.objects.bulk_create(nodes_to_create, ignore_conflicts=True) - - # Efficiently update Node objects for offline and online nodes - Node.objects.filter(node_id__in=offline_nodes).update(online=False) - Node.objects.filter(node_id__in=online_nodes).update(online=True) - if redis_updates: - r.mset(redis_updates) - - - -from .utils import check_node_status -@app.task -def fetch_and_update_relay_nodes_online_status(): - base_url = "http://yacn2.dev.golem.network:9000/nodes/" - current_online_nodes = set() nodes_to_update = [] - for prefix in range(256): - try: - response = requests.get(f"{base_url}{prefix:02x}", timeout=5) - response.raise_for_status() - data = response.json() - - for node_id, sessions in data.items(): - node_id = node_id.strip().lower() - is_online = bool(sessions) and any('seen' in item for item in sessions if item) - current_online_nodes.add(node_id) - nodes_to_update.append((node_id, is_online)) - - except requests.RequestException as e: - print(f"Error fetching data for prefix {prefix:02x}: {e}") - - # Bulk update node statuses - bulk_update_node_statuses.delay(nodes_to_update) - - # Check providers that were previously online but not found in the current scan - previously_online = set(NodeStatusHistory.objects.filter( - is_online=True - ).order_by('node_id', '-timestamp').distinct('node_id').values_list('node_id', flat=True)) + with transaction.atomic(): + for node_id, is_online in nodes_data: + try: + node = Node.objects.select_for_update().get(node_id=node_id) + if node.online != is_online: + node.online = is_online + nodes_to_update.append(node) + except ObjectDoesNotExist: + node = Node(node_id=node_id, online=is_online, type="requestor") + nodes_to_update.append(node) - missing_nodes = previously_online - current_online_nodes - if missing_nodes: - check_missing_nodes.delay(list(missing_nodes)) + status_history_to_create.append( + NodeStatusHistory(node_id=node_id, is_online=is_online) + ) + # Bulk create new nodes and update existing ones + Node.objects.bulk_create([n for n in nodes_to_update if n._state.adding], ignore_conflicts=True) + Node.objects.bulk_update([n for n in nodes_to_update if not n._state.adding], ['online']) -@app.task -def check_missing_nodes(missing_nodes): - nodes_to_update = [] - for node_id in missing_nodes: - is_online = check_node_status(node_id) - nodes_to_update.append((node_id, is_online)) - - bulk_update_node_statuses.delay(nodes_to_update) \ No newline at end of file + # Bulk create status history + NodeStatusHistory.objects.bulk_create(status_history_to_create) diff --git a/stats-backend/core/celery.py b/stats-backend/core/celery.py index dd14593..ff1d0ee 100644 --- a/stats-backend/core/celery.py +++ b/stats-backend/core/celery.py @@ -75,17 +75,9 @@ def setup_periodic_tasks(sender, **kwargs): daily_volume_golem_vs_chain, computing_total_over_time, extract_wallets_and_ids, - fetch_and_update_relay_nodes_online_status, ) v2_offer_scraper.apply_async(args=["ray-on-golem-heads"], queue="yagna", routing_key="yagna") v2_offer_scraper.apply_async(queue="yagna", routing_key="yagna") - - sender.add_periodic_task( - 30, - fetch_and_update_relay_nodes_online_status.s(), - queue="default", - options={"queue": "default", "routing_key": "default"}, - ) sender.add_periodic_task( 60, computing_total_over_time.s(),