Skip to content

Commit

Permalink
Remove atomic causing deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptobench committed Mar 20, 2024
1 parent 7529b0f commit f716e38
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 191 deletions.
69 changes: 34 additions & 35 deletions stats-backend/api2/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,41 +174,40 @@ def update_nodes_status(provider_id, is_online_now):
def update_nodes_data(node_props):
r = redis.Redis(host="redis", port=6379, db=0)

with transaction.atomic():
for props in node_props:
props = json.loads(props)
issuer_id = props["node_id"]
is_online_now = check_node_status(issuer_id)
try:
update_nodes_status(issuer_id, is_online_now)
r.set(f"provider:{issuer_id}:status", str(is_online_now))
except Exception as e:
print(f"Error updating NodeStatus for {issuer_id}: {e}")
print(f"Done updating {len(node_props)} providers")
# Deserialize each element in node_props into a dictionary
deserialized_node_props = [json.loads(props) for props in node_props]

# Now create the set
provider_ids_in_props = {props["node_id"] for props in deserialized_node_props}
previously_online_providers_ids = (
Node.objects.filter(nodestatushistory__is_online=True)
.distinct()
.values_list("node_id", flat=True)
)

provider_ids_not_in_scan = (
set(previously_online_providers_ids) - provider_ids_in_props
)

for issuer_id in provider_ids_not_in_scan:
is_online_now = check_node_status(issuer_id)

try:
update_nodes_status(issuer_id, is_online_now)
r.set(f"provider:{issuer_id}:status", str(is_online_now))
except Exception as e:
print(f"Error verifying/updating NodeStatus for {issuer_id}: {e}")
print(f"Done updating {len(provider_ids_not_in_scan)} OFFLINE providers")
for props in node_props:
props = json.loads(props)
issuer_id = props["node_id"]
is_online_now = check_node_status(issuer_id)
try:
update_nodes_status(issuer_id, is_online_now)
r.set(f"provider:{issuer_id}:status", str(is_online_now))
except Exception as e:
print(f"Error updating NodeStatus for {issuer_id}: {e}")
print(f"Done updating {len(node_props)} providers")
# Deserialize each element in node_props into a dictionary
deserialized_node_props = [json.loads(props) for props in node_props]

# Now create the set
provider_ids_in_props = {props["node_id"] for props in deserialized_node_props}
previously_online_providers_ids = (
Node.objects.filter(nodestatushistory__is_online=True)
.distinct()
.values_list("node_id", flat=True)
)

provider_ids_not_in_scan = (
set(previously_online_providers_ids) - provider_ids_in_props
)

for issuer_id in provider_ids_not_in_scan:
is_online_now = check_node_status(issuer_id)

try:
update_nodes_status(issuer_id, is_online_now)
r.set(f"provider:{issuer_id}:status", str(is_online_now))
except Exception as e:
print(f"Error verifying/updating NodeStatus for {issuer_id}: {e}")
print(f"Done updating {len(provider_ids_not_in_scan)} OFFLINE providers")


def check_node_status(issuer_id):
Expand Down
266 changes: 131 additions & 135 deletions stats-backend/api2/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,51 +54,50 @@ def default(self, obj):
def online_nodes_uptime_donut_data():

try:
with transaction.atomic():
# Fetching nodes with only necessary fields to reduce query load
nodes_mainnet = Node.objects.filter(online=True, network="mainnet").only(
"node_id"
)
nodes_testnet = Node.objects.filter(online=True, network="testnet").only(
"node_id"
)
# Fetching nodes with only necessary fields to reduce query load
nodes_mainnet = Node.objects.filter(online=True, network="mainnet").only(
"node_id"
)
nodes_testnet = Node.objects.filter(online=True, network="testnet").only(
"node_id"
)

# Initializing data structure for each network
uptime_data = {
"mainnet": {
"80_and_over": 0,
"50_to_79": 0,
"30_to_49": 0,
"below_30": 0,
"totalOnline": nodes_mainnet.count(),
},
"testnet": {
"80_and_over": 0,
"50_to_79": 0,
"30_to_49": 0,
"below_30": 0,
"totalOnline": nodes_testnet.count(),
},
}
# Initializing data structure for each network
uptime_data = {
"mainnet": {
"80_and_over": 0,
"50_to_79": 0,
"30_to_49": 0,
"below_30": 0,
"totalOnline": nodes_mainnet.count(),
},
"testnet": {
"80_and_over": 0,
"50_to_79": 0,
"30_to_49": 0,
"below_30": 0,
"totalOnline": nodes_testnet.count(),
},
}

def update_uptime_data(nodes, network):
for node in nodes:
uptime_percentage = calculate_uptime_percentage(node.node_id)
if uptime_percentage >= 80:
uptime_data[network]["80_and_over"] += 1
elif 50 <= uptime_percentage < 80:
uptime_data[network]["50_to_79"] += 1
elif 30 <= uptime_percentage < 50:
uptime_data[network]["30_to_49"] += 1
else:
uptime_data[network]["below_30"] += 1

# Updating uptime data for each network
update_uptime_data(nodes_mainnet, "mainnet")
update_uptime_data(nodes_testnet, "testnet")

# Save the result in a cache or similar storage
r.set("online_nodes_uptime_donut_data", json.dumps(uptime_data))
def update_uptime_data(nodes, network):
for node in nodes:
uptime_percentage = calculate_uptime_percentage(node.node_id)
if uptime_percentage >= 80:
uptime_data[network]["80_and_over"] += 1
elif 50 <= uptime_percentage < 80:
uptime_data[network]["50_to_79"] += 1
elif 30 <= uptime_percentage < 50:
uptime_data[network]["30_to_49"] += 1
else:
uptime_data[network]["below_30"] += 1

# Updating uptime data for each network
update_uptime_data(nodes_mainnet, "mainnet")
update_uptime_data(nodes_testnet, "testnet")

# Save the result in a cache or similar storage
r.set("online_nodes_uptime_donut_data", json.dumps(uptime_data))
except Exception as e:
print(f"Error: {e}")

Expand Down Expand Up @@ -802,38 +801,37 @@ def providers_who_received_tasks():
content, status_code = get_stats_data(domain)
if status_code == 200:
data = content["data"]["result"]
with transaction.atomic():
for obj in data:
instance_id = obj["metric"]["instance"]
node, _ = Node.objects.get_or_create(node_id=instance_id)
try:
offer = Offer.objects.get(provider=node, runtime="vm")
if offer is None:
continue
pricing_model = offer.properties.get(
"golem.com.pricing.model.linear.coeffs", []
)
usage_vector = offer.properties.get("golem.com.usage.vector", [])
if not usage_vector or not pricing_model:
continue

static_start_price = pricing_model[-1]
cpu_index = usage_vector.index("golem.usage.cpu_sec")
cpu_per_hour_price = pricing_model[cpu_index] * 3600

duration_index = usage_vector.index("golem.usage.duration_sec")
env_per_hour_price = pricing_model[duration_index] * 3600

ProviderWithTask.objects.create(
instance=node,
offer=offer,
cpu_per_hour=cpu_per_hour_price,
env_per_hour=env_per_hour_price,
start_price=static_start_price,
network=identify_network_by_offer(offer),
)
except Offer.DoesNotExist:
print(f"Offer for node {node.node_id} not found")
for obj in data:
instance_id = obj["metric"]["instance"]
node, _ = Node.objects.get_or_create(node_id=instance_id)
try:
offer = Offer.objects.get(provider=node, runtime="vm")
if offer is None:
continue
pricing_model = offer.properties.get(
"golem.com.pricing.model.linear.coeffs", []
)
usage_vector = offer.properties.get("golem.com.usage.vector", [])
if not usage_vector or not pricing_model:
continue

static_start_price = pricing_model[-1]
cpu_index = usage_vector.index("golem.usage.cpu_sec")
cpu_per_hour_price = pricing_model[cpu_index] * 3600

duration_index = usage_vector.index("golem.usage.duration_sec")
env_per_hour_price = pricing_model[duration_index] * 3600

ProviderWithTask.objects.create(
instance=node,
offer=offer,
cpu_per_hour=cpu_per_hour_price,
env_per_hour=env_per_hour_price,
start_price=static_start_price,
network=identify_network_by_offer(offer),
)
except Offer.DoesNotExist:
print(f"Offer for node {node.node_id} not found")


from django.db.models import Avg
Expand Down Expand Up @@ -967,7 +965,6 @@ def median_and_average_pricing_past_hour():
"start_average": start_average_mainnet if start_average_mainnet else 0,
},
}
print(f"Median and average pricing data: {pricing_data}")

r.set("pricing_past_hour_v2", json.dumps(pricing_data))
except Exception as e:
Expand Down Expand Up @@ -1050,67 +1047,64 @@ def pricing_snapshot_stats_with_dates(start_date, end_date, network):

@app.task
def sum_highest_runtime_resources():
with transaction.atomic():
online_nodes = Node.objects.filter(online=True)

total_cores = 0
total_memory = 0
total_storage = 0
total_gpus = 0

for node in online_nodes:
offers = Offer.objects.filter(provider=node)
max_resources = offers.annotate(
cores=Cast(
KeyTextTransform("golem.inf.cpu.threads", "properties"),
IntegerField(),
),
memory=Cast(
KeyTextTransform("golem.inf.mem.gib", "properties"), FloatField()
),
storage=Cast(
KeyTextTransform("golem.inf.storage.gib", "properties"),
FloatField(),
),
gpu_model=KeyTextTransform(
"golem.!exp.gap-35.v1.inf.gpu.model", "properties"
),
).aggregate(
max_cores=Max("cores"),
max_memory=Max("memory"),
max_storage=Max("storage"),
gpu_count=Count("gpu_model", filter=Q(gpu_model__isnull=False)),
)

total_cores += (
max_resources["max_cores"] if max_resources["max_cores"] else 0
)
total_memory += (
max_resources["max_memory"] if max_resources["max_memory"] else 0
)
total_storage += (
max_resources["max_storage"] if max_resources["max_storage"] else 0
)
total_gpus += max_resources["gpu_count"]
online_nodes = Node.objects.filter(online=True)

total_cores = 0
total_memory = 0
total_storage = 0
total_gpus = 0

for node in online_nodes:
offers = Offer.objects.filter(provider=node)
max_resources = offers.annotate(
cores=Cast(
KeyTextTransform("golem.inf.cpu.threads", "properties"),
IntegerField(),
),
memory=Cast(
KeyTextTransform("golem.inf.mem.gib", "properties"), FloatField()
),
storage=Cast(
KeyTextTransform("golem.inf.storage.gib", "properties"),
FloatField(),
),
gpu_model=KeyTextTransform(
"golem.!exp.gap-35.v1.inf.gpu.model", "properties"
),
).aggregate(
max_cores=Max("cores"),
max_memory=Max("memory"),
max_storage=Max("storage"),
gpu_count=Count("gpu_model", filter=Q(gpu_model__isnull=False)),
)

print(
f"Total cores: {total_cores}"
f"Total memory: {total_memory}"
f"Total storage: {total_storage}"
f"Total gpus: {total_gpus}"
total_cores += max_resources["max_cores"] if max_resources["max_cores"] else 0
total_memory += (
max_resources["max_memory"] if max_resources["max_memory"] else 0
)
r.set(
"v2_network_online_stats",
json.dumps(
{
"providers": online_nodes.count(),
"cores": total_cores,
"memory": total_memory,
"storage": total_storage,
"gpus": total_gpus,
}
),
total_storage += (
max_resources["max_storage"] if max_resources["max_storage"] else 0
)
total_gpus += max_resources["gpu_count"]

print(
f"Total cores: {total_cores}"
f"Total memory: {total_memory}"
f"Total storage: {total_storage}"
f"Total gpus: {total_gpus}"
)
r.set(
"v2_network_online_stats",
json.dumps(
{
"providers": online_nodes.count(),
"cores": total_cores,
"memory": total_memory,
"storage": total_storage,
"gpus": total_gpus,
}
),
)


@app.task
Expand Down Expand Up @@ -1295,4 +1289,6 @@ def online_nodes_computing():
Node.objects.filter(node_id__in=computing_node_ids).update(computing_now=True)
Node.objects.exclude(node_id__in=computing_node_ids).update(computing_now=False)
NodeV1.objects.filter(node_id__in=computing_node_ids).update(computing_now=True)
NodeV1.objects.exclude(node_id__in=computing_node_ids).update(computing_now=False)
NodeV1.objects.exclude(node_id__in=computing_node_ids).update(
computing_now=False
)
Loading

0 comments on commit f716e38

Please sign in to comment.