Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Simplify starting subnet in API only mode #82

Merged
merged 8 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
55 changes: 25 additions & 30 deletions packages/service-discovery/api/miner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type MinerService = {
address: string;
};

// this is the data format when we are not running the edge network alongside the subnet.
type MinerServiceApiOnly = {
uid: string;
type: string;
Expand All @@ -40,18 +39,14 @@ export async function POST(request: Request) {
? Boolean(params.get("api-only"))
: false;

// if we run with API only mode, we will not register bittensor specific properties in the data model
if (apiOnly) {
const miner = (await request.json()) as MinerServiceApiOnly;
const pipe = redis
.pipeline()
.hset(`apionly:miner:${String(miner.uid)}`, miner)
.sadd(`apionly:miners:${miner.type}`, miner.uid)
.set(`apionly:address:${miner.uid}`, miner.address)
.set(`apionly:miner:uid:${miner.uid}:address`, miner.address);
const pipe = redis.pipeline().hset(`apionly:miner:${miner.uid}`, miner);

pipe.sadd(`apionly:miners:type:${miner.type}`, miner.uid);

miner.models.forEach((modelName) => {
pipe.sadd(`apionly:${modelName}`, miner.uid);
pipe.sadd(`apionly:miners:model:${modelName}`, miner.uid);
});

await pipe.exec();
Expand All @@ -61,15 +56,12 @@ export async function POST(request: Request) {

const miner = (await request.json()) as MinerService;

const pipe = redis
.pipeline()
.hset(`miner:${String(miner.netuid)}`, miner)
.sadd(`miners:${miner.type}`, miner.netuid)
.set(`address:${miner.netuid}`, miner.address)
.set(`miner:nuid:${miner.netuid}:address`, miner.address);
const pipe = redis.pipeline().hset(`miner:${miner.netuid}`, miner);

pipe.sadd(`miners:type:${miner.type}`, miner.netuid);

miner.models.forEach((modelName) => {
pipe.sadd(modelName, miner.netuid);
pipe.sadd(`miners:model:${modelName}`, miner.netuid);
});

await pipe.exec();
Expand All @@ -94,21 +86,24 @@ export async function GET(request: Request) {

const model = params.get("model");

if (!model) {
return new Response("Error: model is missing in search params", {
status: 400,
});
}
let minersUidForModel: string[] = [];

const minersUidForModel = apiOnly
? await redis.smembers(`apionly:${model}`)
: await redis.smembers(model);
if (model) {
minersUidForModel = apiOnly
? await redis.smembers(`apionly:miners:model:${model}`)
: await redis.smembers(`miners:model:${model}`);

// If the model set does not exist, return an error response
if (minersUidForModel.length === 0) {
return new Response(`Error: no miners found for model ${model}`, {
status: 404,
});
if (minersUidForModel.length === 0) {
return new Response(`Error: no miners found for model ${model}`, {
status: 404,
});
}
} else {
const minerKeys = apiOnly
? await redis.keys("apionly:miner:*")
: await redis.keys("miner:*");

minersUidForModel = minerKeys.map((key) => key.split(":")[apiOnly ? 2 : 1]);
}

const pipe = redis.pipeline();
Expand All @@ -119,7 +114,7 @@ export async function GET(request: Request) {

const miners = await pipe.exec();

return new Response(JSON.stringify(miners), {
return new Response(JSON.stringify(miners.filter(Boolean)), {
headers: { "Content-Type": "application/json" },
});
}
1 change: 1 addition & 0 deletions packages/service-discovery/package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"name": "service-discovery",
"description": "This is an internal API mapping what services are available at which address in the network.",
"module": "NodeNext",
"dependencies": {
"@upstash/redis": "^1.29.0"
Expand Down
2 changes: 1 addition & 1 deletion subnet/miner-cloudflare/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def get_config() -> "bt.Config":
"--api_only",
action="store_true",
help="Bypass connection to metagraph and subtensor and only starts the akeru API layer",
default=False,
default=True,
)

# Adds subtensor specific arguments i.e. --subtensor.chain_endpoint ... --subtensor.network ...
Expand Down
5 changes: 3 additions & 2 deletions subnet/miner-cloudflare/stream_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def subtensor_connected(self):
def __init__(self, config=None, axon=None, wallet=None, subtensor=None):
# load env variables
load_dotenv()
self.api_only = os.getenv('API_ONLY', 'True')

self.CLOUDFLARE_AUTH_TOKEN = os.getenv('CLOUDFLARE_AUTH_TOKEN')
self.CLOUDFLARE_ACCOUNT_ID = os.getenv('CLOUDFLARE_ACCOUNT_ID')
Expand All @@ -48,7 +49,7 @@ def __init__(self, config=None, axon=None, wallet=None, subtensor=None):

self.prompt_cache: Dict[str, Tuple[str, int]] = {}

if self.config.api_only != True:
if self.api_only != 'True':
# Activating Bittensor's logging with the set configurations.
bt.logging(config=self.config, logging_dir=self.config.full_path)
bt.logging.info("Setting up bittensor objects.")
Expand Down Expand Up @@ -98,8 +99,8 @@ def __init__(self, config=None, axon=None, wallet=None, subtensor=None):
# send to the service map
post(f'{url}/api/miner',
data=json.dumps(service_map_dict), headers=headers)

bt.logging.info(f"Running miner on uid: {self.my_subnet_uid}")

else:
self.uuid = os.getenv('UUID') or uuid.uuid4()
url = os.getenv('SERVICE_MESH_URL')
Expand Down
69 changes: 25 additions & 44 deletions subnet/validator/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
import asyncio
import os
import bittensor as bt
import torch
from miner_manager import MinerManager
from validator import BaseValidatorNeuron
from fastapi import FastAPI, Request
import aiohttp
import random
from reward import calculate_total_message_length, get_reward
from typing import TypedDict, Union, List
from urllib.parse import urljoin, urlencode
from typing import TypedDict, List
from dotenv import load_dotenv

load_dotenv()


api_only = os.getenv('API_ONLY')
miner_manager = MinerManager(api_only=api_only == 'True')


async def run_miner_manager():
while True:
await miner_manager.run()
await asyncio.sleep(10)


class Miner(TypedDict):
address: str
Expand All @@ -23,42 +35,8 @@ class Validator(BaseValidatorNeuron):
def __init__(self, config=None):
super(Validator, self).__init__(config=config)
bt.logging.info("load_state()")
load_dotenv()
self.load_state()

async def get_miner_with_model(self, model_name) -> Union[Miner, dict]:
"""
Asynchronously fetches a miner with a specific model from the service mesh.

Args:
model_name (str): The name of the model to search for.

Returns:
dict: If the response data is a list, it returns a random miner from the list.
If the response data is not a list, it returns the data as is.
"""

api_only = self.subtensor_connected == False
service_map_url = os.getenv('SERVICE_MESH_URL')
secret = os.getenv('SECRET_KEY')
# for now miners are allow listed manually and given a secret key to identify
headers = {'Content-Type': 'application/json',
'Authorization': f'Bearer {secret}'}

base_url = urljoin(service_map_url, '/api/miner')
params = {'model': model_name,
'api-only': 'true' if api_only else 'false'}

request_url = f"{base_url}?{urlencode(params)}"

async with aiohttp.ClientSession() as session:
async with session.get(request_url, headers=headers) as resp:
data = await resp.json()

if isinstance(data, list) and data:
return random.choice(data)

return data
if api_only == 'False':
self.load_state()


app = FastAPI()
Expand All @@ -68,15 +46,14 @@ async def get_miner_with_model(self, model_name) -> Union[Miner, dict]:
@app.post("/chat")
async def chat(request: Request):
data = await request.json()

model = data['model']
miner = await validator.get_miner_with_model(model_name=model)
miner_uid = miner['netuid']
miner = miner_manager.get_fastest_miner_for_model(model=model)
miner_id = miner["id"]
prompt_len = calculate_total_message_length(data)

async with aiohttp.ClientSession() as session:
url = miner['address']
async with session.post(url, json=data) as resp:
async with session.post(f'{url}/chat', json=data) as resp:
response = await resp.json()
completion_len = len(response[-1])

Expand All @@ -85,11 +62,15 @@ async def chat(request: Request):
print(f'reward for prompt: {reward}')
if (validator.subtensor_connected):
validator.update_scores(
torch.FloatTensor([reward]), [int(miner_uid)])
torch.FloatTensor([reward]), [int(miner_id)])

return response


@app.on_event("startup")
async def startup_event():
asyncio.create_task(run_miner_manager())

# The main function parses the configuration and runs the validator.
if __name__ == "__main__":
import uvicorn
Expand Down
Loading
Loading