From cd6d1988486f84751eee53102ec32002763affe5 Mon Sep 17 00:00:00 2001 From: Phillip Jensen Date: Mon, 14 Oct 2024 13:35:47 +0200 Subject: [PATCH] fixed ssr --- .../api2/management/commands/relay_monitor.py | 91 ++++++++++++ stats-backend/api2/tasks.py | 136 +++--------------- stats-backend/core/celery.py | 2 - 3 files changed, 112 insertions(+), 117 deletions(-) create mode 100644 stats-backend/api2/management/commands/relay_monitor.py diff --git a/stats-backend/api2/management/commands/relay_monitor.py b/stats-backend/api2/management/commands/relay_monitor.py new file mode 100644 index 0000000..c2e1ec6 --- /dev/null +++ b/stats-backend/api2/management/commands/relay_monitor.py @@ -0,0 +1,91 @@ +# yourapp/management/commands/relay_monitor.py + +import asyncio +import aiohttp +import requests +from django.core.management.base import BaseCommand +from django.db.models import Q +from api2.models import Node +from api2.tasks import bulk_update_node_statuses + +class Command(BaseCommand): + help = 'Monitors relay nodes and listens for events' + + def handle(self, *args, **options): + self.stdout.write('Starting relay monitor...') + asyncio.run(self.main()) + + async def main(self): + await self.initial_relay_nodes_scan() + await self.listen_for_relay_events() + + async def initial_relay_nodes_scan(self): + base_url = "http://yacn2.dev.golem.network:9000/nodes/" + nodes_to_update = {} + + for prefix in range(256): + try: + response = requests.get(f"{base_url}{prefix:02x}", timeout=5) + response.raise_for_status() + data = response.json() + + for node_id, sessions in data.items(): + node_id = node_id.strip().lower() + is_online = bool(sessions) and any('seen' in item for item in sessions if item) + nodes_to_update[node_id] = is_online + + except requests.RequestException as e: + print(f"Error fetching data for prefix {prefix:02x}: {e}") + + # Query the database for all online providers + online_providers = set(Node.objects.filter(online=True).values_list('node_id', flat=True)) + + # Check for providers that are marked as online in the database but not in the relay data + for provider_id in online_providers: + if provider_id not in nodes_to_update: + nodes_to_update[provider_id] = False + + # Convert the dictionary to a list of tuples + nodes_to_update_list = list(nodes_to_update.items()) + + bulk_update_node_statuses.delay(nodes_to_update_list) + + async def listen_for_relay_events(self): + self.stdout.write('Listening for relay events...') + url = "http://yacn2.dev.golem.network:9000/events" + async with aiohttp.ClientSession() as session: + while True: + try: + async with session.get(url) as resp: + async for line in resp.content: + if line: + try: + decoded_line = line.decode('utf-8').strip() + if decoded_line.startswith('event:'): + event_type = decoded_line.split(':', 1)[1].strip() + elif decoded_line.startswith('data:'): + node_id = decoded_line.split(':', 1)[1].strip() + event = {'Type': event_type, 'Id': node_id} + await self.process_event(event) + except Exception as e: + self.stdout.write(self.style.ERROR(f"Failed to process event: {e}")) + except Exception as e: + self.stdout.write(self.style.ERROR(f"Connection error: {e}")) + await asyncio.sleep(5) # Wait before reconnecting + + async def process_event(self, event): + event_type = event.get('Type') + node_id = event.get('Id') + + if event_type == 'new-node': + self.stdout.write(f"New node: {node_id}") + bulk_update_node_statuses.delay([(node_id, True)]) + elif event_type == 'lost-node': + self.stdout.write(f"Lost node: {node_id}") + bulk_update_node_statuses.delay([(node_id, False)]) + + async def fetch(self, url): + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=5) as response: + response.raise_for_status() + return await response.json() \ No newline at end of file diff --git a/stats-backend/api2/tasks.py b/stats-backend/api2/tasks.py index aa0aca1..f83fcd5 100644 --- a/stats-backend/api2/tasks.py +++ b/stats-backend/api2/tasks.py @@ -1825,132 +1825,38 @@ def extract_wallets_and_ids(): from django.db import transaction from django.db.models import Case, When, Value, BooleanField from .models import NodeStatusHistory, Node -from django.db import IntegrityError +from django.db import transaction +from django.db.models import F +from django.core.exceptions import ObjectDoesNotExist +import redis @app.task def bulk_update_node_statuses(nodes_data): status_history_to_create = [] - redis_updates = {} - offline_nodes = [] - online_nodes = [] - for node_id, is_online in nodes_data: - latest_status = r.get(f"provider:{node_id}:status") - - if latest_status is None or latest_status.decode() != str(is_online): + nodes_to_update = [] + + with transaction.atomic(): + for node_id, is_online in nodes_data: try: - node, created = Node.objects.get_or_create( - node_id=node_id, - defaults={'online': is_online} - ) - if created: - node.type = "requestor" - node.save() - except IntegrityError: - # If creation fails due to race condition, try to get the object - try: - node = Node.objects.get(node_id=node_id) - except Node.DoesNotExist: - print(f"Node {node_id} not found") - # If still not found, create a new object - node = Node(node_id=node_id, online=is_online, type="requestor") - node.save() - node.online = is_online - node.save() + node = Node.objects.select_for_update().get(node_id=node_id) + if node.online != is_online: + node.online = is_online + nodes_to_update.append(node) + except ObjectDoesNotExist: + node = Node(node_id=node_id, online=is_online, type="requestor") + nodes_to_update.append(node) + status_history_to_create.append( NodeStatusHistory(node_id=node_id, is_online=is_online) ) - redis_updates[f"provider:{node_id}:status"] = str(is_online) - - if is_online: - online_nodes.append(node_id) - else: - offline_nodes.append(node_id) - if status_history_to_create: - with transaction.atomic(): - NodeStatusHistory.objects.bulk_create(status_history_to_create) - - # Efficiently update Node objects for offline nodes - Node.objects.filter(node_id__in=offline_nodes).update(online=False) - Node.objects.filter(node_id__in=online_nodes).update(online=True) - if redis_updates: - r.mset(redis_updates) + # Bulk create new nodes and update existing ones + Node.objects.bulk_create([n for n in nodes_to_update if n._state.adding], ignore_conflicts=True) + Node.objects.bulk_update([n for n in nodes_to_update if not n._state.adding], ['online']) + # Bulk create status history + NodeStatusHistory.objects.bulk_create(status_history_to_create) -from .utils import check_node_status -import aiohttp -import asyncio -import json -from celery.utils.log import get_task_logger -from celery_singleton import Singleton - - -@app.task(base=Singleton, bind=True, max_retries=None) -def listen_for_relay_events(self): - try: - asyncio.run(event_listener()) - except Exception as exc: - print(f"listen_for_relay_events task failed: {exc}") - self.retry(countdown=5, exc=exc) # Retry after 5 seconds - -async def event_listener(): - url = "http://yacn2.dev.golem.network:9000/events" - async with aiohttp.ClientSession() as session: - async with session.get(url) as resp: - async for line in resp.content: - if line: - try: - decoded_line = line.decode('utf-8').strip() - if decoded_line.startswith('event:'): - event_type = decoded_line.split(':', 1)[1].strip() - elif decoded_line.startswith('data:'): - node_id = decoded_line.split(':', 1)[1].strip() - event = {'Type': event_type, 'Id': node_id} - process_event(event) - except Exception as e: - print(f"Failed to process event: {e}") - -def process_event(event): - event_type = event.get('Type') - node_id = event.get('Id') - - if event_type == 'new-node': - print(f"New node: {node_id}") - bulk_update_node_statuses.delay([(node_id, True)]) - elif event_type == 'lost-node': - print(f"Lost node: {node_id}") - bulk_update_node_statuses.delay([(node_id, False)]) -@app.task -def initial_relay_nodes_scan(): - base_url = "http://yacn2.dev.golem.network:9000/nodes/" - nodes_to_update = [] - for prefix in range(256): - try: - response = requests.get(f"{base_url}{prefix:02x}", timeout=5) - response.raise_for_status() - data = response.json() - - for node_id, sessions in data.items(): - node_id = node_id.strip().lower() - is_online = bool(sessions) and any('seen' in item for item in sessions if item) - nodes_to_update.append((node_id, is_online)) - - except requests.RequestException as e: - print(f"Error fetching data for prefix {prefix:02x}: {e}") - - bulk_update_node_statuses.delay(nodes_to_update) - listen_for_relay_events.delay() - - - -@app.task -def check_missing_nodes(missing_nodes): - nodes_to_update = [] - for node_id in missing_nodes: - is_online = check_node_status(node_id) - nodes_to_update.append((node_id, is_online)) - - bulk_update_node_statuses.delay(nodes_to_update) \ No newline at end of file diff --git a/stats-backend/core/celery.py b/stats-backend/core/celery.py index ba00004..ff1d0ee 100644 --- a/stats-backend/core/celery.py +++ b/stats-backend/core/celery.py @@ -75,11 +75,9 @@ def setup_periodic_tasks(sender, **kwargs): daily_volume_golem_vs_chain, computing_total_over_time, extract_wallets_and_ids, - initial_relay_nodes_scan, ) v2_offer_scraper.apply_async(args=["ray-on-golem-heads"], queue="yagna", routing_key="yagna") v2_offer_scraper.apply_async(queue="yagna", routing_key="yagna") - initial_relay_nodes_scan.delay() sender.add_periodic_task( 60, computing_total_over_time.s(),