diff --git a/stats-backend/api2/scanner.py b/stats-backend/api2/scanner.py index 9f41658..75f7ad5 100644 --- a/stats-backend/api2/scanner.py +++ b/stats-backend/api2/scanner.py @@ -174,41 +174,40 @@ def update_nodes_status(provider_id, is_online_now): def update_nodes_data(node_props): r = redis.Redis(host="redis", port=6379, db=0) - with transaction.atomic(): - for props in node_props: - props = json.loads(props) - issuer_id = props["node_id"] - is_online_now = check_node_status(issuer_id) - try: - update_nodes_status(issuer_id, is_online_now) - r.set(f"provider:{issuer_id}:status", str(is_online_now)) - except Exception as e: - print(f"Error updating NodeStatus for {issuer_id}: {e}") - print(f"Done updating {len(node_props)} providers") - # Deserialize each element in node_props into a dictionary - deserialized_node_props = [json.loads(props) for props in node_props] - - # Now create the set - provider_ids_in_props = {props["node_id"] for props in deserialized_node_props} - previously_online_providers_ids = ( - Node.objects.filter(nodestatushistory__is_online=True) - .distinct() - .values_list("node_id", flat=True) - ) - - provider_ids_not_in_scan = ( - set(previously_online_providers_ids) - provider_ids_in_props - ) - - for issuer_id in provider_ids_not_in_scan: - is_online_now = check_node_status(issuer_id) - - try: - update_nodes_status(issuer_id, is_online_now) - r.set(f"provider:{issuer_id}:status", str(is_online_now)) - except Exception as e: - print(f"Error verifying/updating NodeStatus for {issuer_id}: {e}") - print(f"Done updating {len(provider_ids_not_in_scan)} OFFLINE providers") + for props in node_props: + props = json.loads(props) + issuer_id = props["node_id"] + is_online_now = check_node_status(issuer_id) + try: + update_nodes_status(issuer_id, is_online_now) + r.set(f"provider:{issuer_id}:status", str(is_online_now)) + except Exception as e: + print(f"Error updating NodeStatus for {issuer_id}: {e}") + print(f"Done updating {len(node_props)} providers") + # Deserialize each element in node_props into a dictionary + deserialized_node_props = [json.loads(props) for props in node_props] + + # Now create the set + provider_ids_in_props = {props["node_id"] for props in deserialized_node_props} + previously_online_providers_ids = ( + Node.objects.filter(nodestatushistory__is_online=True) + .distinct() + .values_list("node_id", flat=True) + ) + + provider_ids_not_in_scan = ( + set(previously_online_providers_ids) - provider_ids_in_props + ) + + for issuer_id in provider_ids_not_in_scan: + is_online_now = check_node_status(issuer_id) + + try: + update_nodes_status(issuer_id, is_online_now) + r.set(f"provider:{issuer_id}:status", str(is_online_now)) + except Exception as e: + print(f"Error verifying/updating NodeStatus for {issuer_id}: {e}") + print(f"Done updating {len(provider_ids_not_in_scan)} OFFLINE providers") def check_node_status(issuer_id): diff --git a/stats-backend/api2/tasks.py b/stats-backend/api2/tasks.py index 0c94a58..98cdfae 100644 --- a/stats-backend/api2/tasks.py +++ b/stats-backend/api2/tasks.py @@ -54,51 +54,50 @@ def default(self, obj): def online_nodes_uptime_donut_data(): try: - with transaction.atomic(): - # Fetching nodes with only necessary fields to reduce query load - nodes_mainnet = Node.objects.filter(online=True, network="mainnet").only( - "node_id" - ) - nodes_testnet = Node.objects.filter(online=True, network="testnet").only( - "node_id" - ) + # Fetching nodes with only necessary fields to reduce query load + nodes_mainnet = Node.objects.filter(online=True, network="mainnet").only( + "node_id" + ) + nodes_testnet = Node.objects.filter(online=True, network="testnet").only( + "node_id" + ) - # Initializing data structure for each network - uptime_data = { - "mainnet": { - "80_and_over": 0, - "50_to_79": 0, - "30_to_49": 0, - "below_30": 0, - "totalOnline": nodes_mainnet.count(), - }, - "testnet": { - "80_and_over": 0, - "50_to_79": 0, - "30_to_49": 0, - "below_30": 0, - "totalOnline": nodes_testnet.count(), - }, - } + # Initializing data structure for each network + uptime_data = { + "mainnet": { + "80_and_over": 0, + "50_to_79": 0, + "30_to_49": 0, + "below_30": 0, + "totalOnline": nodes_mainnet.count(), + }, + "testnet": { + "80_and_over": 0, + "50_to_79": 0, + "30_to_49": 0, + "below_30": 0, + "totalOnline": nodes_testnet.count(), + }, + } - def update_uptime_data(nodes, network): - for node in nodes: - uptime_percentage = calculate_uptime_percentage(node.node_id) - if uptime_percentage >= 80: - uptime_data[network]["80_and_over"] += 1 - elif 50 <= uptime_percentage < 80: - uptime_data[network]["50_to_79"] += 1 - elif 30 <= uptime_percentage < 50: - uptime_data[network]["30_to_49"] += 1 - else: - uptime_data[network]["below_30"] += 1 - - # Updating uptime data for each network - update_uptime_data(nodes_mainnet, "mainnet") - update_uptime_data(nodes_testnet, "testnet") - - # Save the result in a cache or similar storage - r.set("online_nodes_uptime_donut_data", json.dumps(uptime_data)) + def update_uptime_data(nodes, network): + for node in nodes: + uptime_percentage = calculate_uptime_percentage(node.node_id) + if uptime_percentage >= 80: + uptime_data[network]["80_and_over"] += 1 + elif 50 <= uptime_percentage < 80: + uptime_data[network]["50_to_79"] += 1 + elif 30 <= uptime_percentage < 50: + uptime_data[network]["30_to_49"] += 1 + else: + uptime_data[network]["below_30"] += 1 + + # Updating uptime data for each network + update_uptime_data(nodes_mainnet, "mainnet") + update_uptime_data(nodes_testnet, "testnet") + + # Save the result in a cache or similar storage + r.set("online_nodes_uptime_donut_data", json.dumps(uptime_data)) except Exception as e: print(f"Error: {e}") @@ -802,38 +801,37 @@ def providers_who_received_tasks(): content, status_code = get_stats_data(domain) if status_code == 200: data = content["data"]["result"] - with transaction.atomic(): - for obj in data: - instance_id = obj["metric"]["instance"] - node, _ = Node.objects.get_or_create(node_id=instance_id) - try: - offer = Offer.objects.get(provider=node, runtime="vm") - if offer is None: - continue - pricing_model = offer.properties.get( - "golem.com.pricing.model.linear.coeffs", [] - ) - usage_vector = offer.properties.get("golem.com.usage.vector", []) - if not usage_vector or not pricing_model: - continue - - static_start_price = pricing_model[-1] - cpu_index = usage_vector.index("golem.usage.cpu_sec") - cpu_per_hour_price = pricing_model[cpu_index] * 3600 - - duration_index = usage_vector.index("golem.usage.duration_sec") - env_per_hour_price = pricing_model[duration_index] * 3600 - - ProviderWithTask.objects.create( - instance=node, - offer=offer, - cpu_per_hour=cpu_per_hour_price, - env_per_hour=env_per_hour_price, - start_price=static_start_price, - network=identify_network_by_offer(offer), - ) - except Offer.DoesNotExist: - print(f"Offer for node {node.node_id} not found") + for obj in data: + instance_id = obj["metric"]["instance"] + node, _ = Node.objects.get_or_create(node_id=instance_id) + try: + offer = Offer.objects.get(provider=node, runtime="vm") + if offer is None: + continue + pricing_model = offer.properties.get( + "golem.com.pricing.model.linear.coeffs", [] + ) + usage_vector = offer.properties.get("golem.com.usage.vector", []) + if not usage_vector or not pricing_model: + continue + + static_start_price = pricing_model[-1] + cpu_index = usage_vector.index("golem.usage.cpu_sec") + cpu_per_hour_price = pricing_model[cpu_index] * 3600 + + duration_index = usage_vector.index("golem.usage.duration_sec") + env_per_hour_price = pricing_model[duration_index] * 3600 + + ProviderWithTask.objects.create( + instance=node, + offer=offer, + cpu_per_hour=cpu_per_hour_price, + env_per_hour=env_per_hour_price, + start_price=static_start_price, + network=identify_network_by_offer(offer), + ) + except Offer.DoesNotExist: + print(f"Offer for node {node.node_id} not found") from django.db.models import Avg @@ -967,7 +965,6 @@ def median_and_average_pricing_past_hour(): "start_average": start_average_mainnet if start_average_mainnet else 0, }, } - print(f"Median and average pricing data: {pricing_data}") r.set("pricing_past_hour_v2", json.dumps(pricing_data)) except Exception as e: @@ -1050,67 +1047,64 @@ def pricing_snapshot_stats_with_dates(start_date, end_date, network): @app.task def sum_highest_runtime_resources(): - with transaction.atomic(): - online_nodes = Node.objects.filter(online=True) - - total_cores = 0 - total_memory = 0 - total_storage = 0 - total_gpus = 0 - - for node in online_nodes: - offers = Offer.objects.filter(provider=node) - max_resources = offers.annotate( - cores=Cast( - KeyTextTransform("golem.inf.cpu.threads", "properties"), - IntegerField(), - ), - memory=Cast( - KeyTextTransform("golem.inf.mem.gib", "properties"), FloatField() - ), - storage=Cast( - KeyTextTransform("golem.inf.storage.gib", "properties"), - FloatField(), - ), - gpu_model=KeyTextTransform( - "golem.!exp.gap-35.v1.inf.gpu.model", "properties" - ), - ).aggregate( - max_cores=Max("cores"), - max_memory=Max("memory"), - max_storage=Max("storage"), - gpu_count=Count("gpu_model", filter=Q(gpu_model__isnull=False)), - ) - - total_cores += ( - max_resources["max_cores"] if max_resources["max_cores"] else 0 - ) - total_memory += ( - max_resources["max_memory"] if max_resources["max_memory"] else 0 - ) - total_storage += ( - max_resources["max_storage"] if max_resources["max_storage"] else 0 - ) - total_gpus += max_resources["gpu_count"] + online_nodes = Node.objects.filter(online=True) + + total_cores = 0 + total_memory = 0 + total_storage = 0 + total_gpus = 0 + + for node in online_nodes: + offers = Offer.objects.filter(provider=node) + max_resources = offers.annotate( + cores=Cast( + KeyTextTransform("golem.inf.cpu.threads", "properties"), + IntegerField(), + ), + memory=Cast( + KeyTextTransform("golem.inf.mem.gib", "properties"), FloatField() + ), + storage=Cast( + KeyTextTransform("golem.inf.storage.gib", "properties"), + FloatField(), + ), + gpu_model=KeyTextTransform( + "golem.!exp.gap-35.v1.inf.gpu.model", "properties" + ), + ).aggregate( + max_cores=Max("cores"), + max_memory=Max("memory"), + max_storage=Max("storage"), + gpu_count=Count("gpu_model", filter=Q(gpu_model__isnull=False)), + ) - print( - f"Total cores: {total_cores}" - f"Total memory: {total_memory}" - f"Total storage: {total_storage}" - f"Total gpus: {total_gpus}" + total_cores += max_resources["max_cores"] if max_resources["max_cores"] else 0 + total_memory += ( + max_resources["max_memory"] if max_resources["max_memory"] else 0 ) - r.set( - "v2_network_online_stats", - json.dumps( - { - "providers": online_nodes.count(), - "cores": total_cores, - "memory": total_memory, - "storage": total_storage, - "gpus": total_gpus, - } - ), + total_storage += ( + max_resources["max_storage"] if max_resources["max_storage"] else 0 ) + total_gpus += max_resources["gpu_count"] + + print( + f"Total cores: {total_cores}" + f"Total memory: {total_memory}" + f"Total storage: {total_storage}" + f"Total gpus: {total_gpus}" + ) + r.set( + "v2_network_online_stats", + json.dumps( + { + "providers": online_nodes.count(), + "cores": total_cores, + "memory": total_memory, + "storage": total_storage, + "gpus": total_gpus, + } + ), + ) @app.task @@ -1295,4 +1289,6 @@ def online_nodes_computing(): Node.objects.filter(node_id__in=computing_node_ids).update(computing_now=True) Node.objects.exclude(node_id__in=computing_node_ids).update(computing_now=False) NodeV1.objects.filter(node_id__in=computing_node_ids).update(computing_now=True) - NodeV1.objects.exclude(node_id__in=computing_node_ids).update(computing_now=False) \ No newline at end of file + NodeV1.objects.exclude(node_id__in=computing_node_ids).update( + computing_now=False + ) diff --git a/stats-backend/collector/tasks.py b/stats-backend/collector/tasks.py index 857ab5e..4cc1e2f 100644 --- a/stats-backend/collector/tasks.py +++ b/stats-backend/collector/tasks.py @@ -433,10 +433,9 @@ def network_node_versions(): except Exception as e: print(e) - with transaction.atomic(): - for node_id, version in node_updates: - Node.objects.filter(node_id=node_id).update(version=version) - Nodev2.objects.filter(node_id=node_id).update(version=version) + for node_id, version in node_updates: + Node.objects.filter(node_id=node_id).update(version=version) + Nodev2.objects.filter(node_id=node_id).update(version=version) @app.task @@ -711,23 +710,22 @@ def node_earnings_total(node_version): ) providers_updates.append((provider.pk, updated_earnings_total)) - with transaction.atomic(): - if node_version == "v1": - Node.objects.bulk_update( - [ - Node(pk=pk, earnings_total=earnings) - for pk, earnings in providers_updates - ], - ["earnings_total"], - ) - elif node_version == "v2": - Nodev2.objects.bulk_update( - [ - Nodev2(pk=pk, earnings_total=earnings) - for pk, earnings in providers_updates - ], - ["earnings_total"], - ) + if node_version == "v1": + Node.objects.bulk_update( + [ + Node(pk=pk, earnings_total=earnings) + for pk, earnings in providers_updates + ], + ["earnings_total"], + ) + elif node_version == "v2": + Nodev2.objects.bulk_update( + [ + Nodev2(pk=pk, earnings_total=earnings) + for pk, earnings in providers_updates + ], + ["earnings_total"], + ) @app.task