Skip to content

Commit

Permalink
Merge pull request #29 from golemfactory/ssr-nodestatus
Browse files Browse the repository at this point in the history
wip
  • Loading branch information
cryptobench authored Oct 10, 2024
2 parents 325a9a4 + 7965356 commit ff6f031
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 25 deletions.
3 changes: 2 additions & 1 deletion requirements.pip
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,5 @@ eth-tester
PyJWT
djangorestframework-simplejwt
django-ninja
numpy
numpy
celery-singleton
18 changes: 18 additions & 0 deletions stats-backend/api2/migrations/0037_alter_node_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.1.7 on 2024-10-03 13:45

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('api2', '0036_nodestatushistory_remove_duplicate'),
]

operations = [
migrations.AlterField(
model_name='node',
name='version',
field=models.CharField(blank=True, db_index=True, max_length=7, null=True),
),
]
18 changes: 18 additions & 0 deletions stats-backend/api2/migrations/0038_node_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.1.7 on 2024-10-03 14:09

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('api2', '0037_alter_node_version'),
]

operations = [
migrations.AddField(
model_name='node',
name='type',
field=models.CharField(db_index=True, default='provider', max_length=42),
),
]
3 changes: 2 additions & 1 deletion stats-backend/api2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ class Node(models.Model):
online = models.BooleanField(default=False, db_index=True)
earnings_total = models.FloatField(null=True, blank=True)
computing_now = models.BooleanField(default=False, db_index=True)
version = models.CharField(max_length=7, db_index=True)
version = models.CharField(max_length=7, db_index=True, null=True, blank=True)
updated_at = models.DateTimeField(auto_now=True, db_index=True)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
uptime_created_at = models.DateTimeField(auto_now_add=True, db_index=True)
network = models.CharField(max_length=42, default="mainnet", db_index=True)
type = models.CharField(max_length=42, default="provider", db_index=True)

def save(self, *args, **kwargs):
if not self.online:
Expand Down
5 changes: 3 additions & 2 deletions stats-backend/api2/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def update_providers_info(node_props):
new_provider_ids = set(provider_ids) - existing_provider_ids

# Create new Node instances if any
new_nodes = [Node(node_id=provider_id) for provider_id in new_provider_ids]
new_nodes = [Node(node_id=provider_id, type="provider") for provider_id in new_provider_ids]
if new_nodes:
Node.objects.bulk_create(new_nodes)

Expand Down Expand Up @@ -174,9 +174,10 @@ def update_providers_info(node_props):
node = existing_nodes_dict[provider_id]
node.wallet = data.get("wallet")
node.network = data.get('network', 'mainnet')
node.type = "provider"
nodes_to_update.append(node)
if nodes_to_update:
Node.objects.bulk_update(nodes_to_update, ['wallet', 'network', 'updated_at'])
Node.objects.bulk_update(nodes_to_update, ['wallet', 'network', 'updated_at', 'type'])
print(f"Done updating {len(provider_ids)} providers")


Expand Down
77 changes: 65 additions & 12 deletions stats-backend/api2/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1825,6 +1825,7 @@ def extract_wallets_and_ids():
from django.db import transaction
from django.db.models import Case, When, Value, BooleanField
from .models import NodeStatusHistory, Node
from django.db import IntegrityError

@app.task
def bulk_update_node_statuses(nodes_data):
Expand All @@ -1836,6 +1837,25 @@ def bulk_update_node_statuses(nodes_data):
latest_status = r.get(f"provider:{node_id}:status")

if latest_status is None or latest_status.decode() != str(is_online):
try:
node, created = Node.objects.get_or_create(
node_id=node_id,
defaults={'online': is_online}
)
if created:
node.type = "requestor"
node.save()
except IntegrityError:
# If creation fails due to race condition, try to get the object
try:
node = Node.objects.get(node_id=node_id)
except Node.DoesNotExist:
print(f"Node {node_id} not found")
# If still not found, create a new object
node = Node(node_id=node_id, online=is_online, type="requestor")
node.save()
node.online = is_online
node.save()
status_history_to_create.append(
NodeStatusHistory(node_id=node_id, is_online=is_online)
)
Expand All @@ -1859,10 +1879,52 @@ def bulk_update_node_statuses(nodes_data):


from .utils import check_node_status
import aiohttp
import asyncio
import json
from celery.utils.log import get_task_logger
from celery_singleton import Singleton


@app.task(base=Singleton, bind=True, max_retries=None)
def listen_for_relay_events(self):
try:
asyncio.run(event_listener())
except Exception as exc:
print(f"listen_for_relay_events task failed: {exc}")
self.retry(countdown=5, exc=exc) # Retry after 5 seconds

async def event_listener():
url = "http://yacn2.dev.golem.network:9000/events"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
async for line in resp.content:
if line:
try:
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('event:'):
event_type = decoded_line.split(':', 1)[1].strip()
elif decoded_line.startswith('data:'):
node_id = decoded_line.split(':', 1)[1].strip()
event = {'Type': event_type, 'Id': node_id}
process_event(event)
except Exception as e:
print(f"Failed to process event: {e}")

def process_event(event):
event_type = event.get('Type')
node_id = event.get('Id')

if event_type == 'new-node':
print(f"New node: {node_id}")
bulk_update_node_statuses.delay([(node_id, True)])
elif event_type == 'lost-node':
print(f"Lost node: {node_id}")
bulk_update_node_statuses.delay([(node_id, False)])

@app.task
def fetch_and_update_relay_nodes_online_status():
def initial_relay_nodes_scan():
base_url = "http://yacn2.dev.golem.network:9000/nodes/"
current_online_nodes = set()
nodes_to_update = []

for prefix in range(256):
Expand All @@ -1874,23 +1936,14 @@ def fetch_and_update_relay_nodes_online_status():
for node_id, sessions in data.items():
node_id = node_id.strip().lower()
is_online = bool(sessions) and any('seen' in item for item in sessions if item)
current_online_nodes.add(node_id)
nodes_to_update.append((node_id, is_online))

except requests.RequestException as e:
print(f"Error fetching data for prefix {prefix:02x}: {e}")

# Bulk update node statuses
bulk_update_node_statuses.delay(nodes_to_update)
listen_for_relay_events.delay()

# Check providers that were previously online but not found in the current scan
previously_online = set(NodeStatusHistory.objects.filter(
is_online=True
).order_by('node_id', '-timestamp').distinct('node_id').values_list('node_id', flat=True))

missing_nodes = previously_online - current_online_nodes
if missing_nodes:
check_missing_nodes.delay(list(missing_nodes))


@app.task
Expand Down
13 changes: 4 additions & 9 deletions stats-backend/core/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from celery.schedules import crontab



logger = logging.getLogger("Celery")

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
Expand Down Expand Up @@ -74,17 +75,11 @@ def setup_periodic_tasks(sender, **kwargs):
daily_volume_golem_vs_chain,
computing_total_over_time,
extract_wallets_and_ids,
fetch_and_update_relay_nodes_online_status,
initial_relay_nodes_scan,
)
v2_offer_scraper.apply_async(args=["ray-on-golem-heads"], queue="yagna", routing_key="yagna")
v2_offer_scraper.apply_async(queue="yagna", routing_key="yagna")

sender.add_periodic_task(
45,
fetch_and_update_relay_nodes_online_status.s(),
queue="default",
options={"queue": "default", "routing_key": "default"},
)
initial_relay_nodes_scan.delay()
sender.add_periodic_task(
60,
computing_total_over_time.s(),
Expand Down Expand Up @@ -491,4 +486,4 @@ def setup_periodic_tasks(sender, **kwargs):
"app.tasks.default": {"queue": "default"},
"app.tasks.yagna": {"queue": "yagna"},
}
app.conf.broker_connection_retry_on_startup = True
app.conf.broker_connection_retry_on_startup = True

0 comments on commit ff6f031

Please sign in to comment.