Skip to content

Commit

Permalink
Merge pull request #16 from tplr-ai/hotfix/put_get
Browse files Browse the repository at this point in the history
use right file names for puts and gets
  • Loading branch information
distributedstatemachine authored Dec 31, 2024
2 parents 1974f6a + c4be210 commit aadf916
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 70 deletions.
4 changes: 4 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
checkpoints
wandb
.pytest*
.venv
12 changes: 8 additions & 4 deletions docs/miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ You should see a container named `templar-miner-<WALLET_HOTKEY>`.
# Install PyTorch with CUDA support
pip install torch --index-url https://download.pytorch.org/whl/cu118

# Install other requirements
pip install -r requirements.txt

# Install uv tool (if needed)
pip install uv
Expand All @@ -238,14 +236,21 @@ You should see a container named `templar-miner-<WALLET_HOTKEY>`.
7. **Set Environment Variables**:

Export necessary environment variables or create a `.env` file in the project root.

```bash
export WANDB_API_KEY=your_wandb_api_key
export NODE_TYPE=your_node_type
export WALLET_NAME=your_wallet_name
export WALLET_HOTKEY=your_wallet_hotkey
export CUDA_DEVICE=your_cuda_device
export NETWORK=your_network
export NETUID=your_netuid
export DEBUG=your_debug_setting
export R2_ACCOUNT_ID=your_r2_account_id
export R2_READ_ACCESS_KEY_ID=your_r2_read_access_key_id
export R2_READ_SECRET_ACCESS_KEY=your_r2_read_secret_access_key
export R2_WRITE_ACCESS_KEY_ID=your_r2_write_access_key_id
export R2_WRITE_SECRET_ACCESS_KEY=your_r2_write_secret_access_key
export GITHUB_USER=your_github_username
```

8. **Run the Miner**:
Expand Down Expand Up @@ -304,7 +309,6 @@ DEBUG=false
- **GPU Requirements**:
- Minimum: NVIDIA H100 with 80GB VRAM
- **Storage**: 100GB+ recommended for model and data
- **RAM**: 32GB+ recommended
- **Network**: Stable internet connection with good bandwidth

### Network Options
Expand Down
17 changes: 16 additions & 1 deletion docs/validator.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,22 @@ You should see a container named `templar-validator-<WALLET_HOTKEY>`.

4. **Set Up Python Environment**:

Same as in the miner setup.
```bash
export WANDB_API_KEY=your_wandb_api_key
export NODE_TYPE=your_node_type
export WALLET_NAME=your_wallet_name
export WALLET_HOTKEY=your_wallet_hotkey
export CUDA_DEVICE=your_cuda_device
export NETWORK=your_network
export NETUID=your_netuid
export DEBUG=your_debug_setting
export R2_ACCOUNT_ID=your_r2_account_id
export R2_READ_ACCESS_KEY_ID=your_r2_read_access_key_id
export R2_READ_SECRET_ACCESS_KEY=your_r2_read_secret_access_key
export R2_WRITE_ACCESS_KEY_ID=your_r2_write_access_key_id
export R2_WRITE_SECRET_ACCESS_KEY=your_r2_write_secret_access_key
export GITHUB_USER=your_github_username
```

5. **Create and Register Validator Wallet**:

Expand Down
46 changes: 30 additions & 16 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,34 +248,48 @@ async def run(self):
device=self.config.device,
local=False
)
# Log gradient stats
tplr.logger.info(f"Gradient stats - Window: {self.sync_window}")

# Check if any gradients were gathered
if not step_grads == 0:
if step_grads is None:
tplr.logger.info("No gradients received, waiting for next window.")
continue

tplr.logger.info(f"Received gradients from UIDs: {step_grads.uids}")

# Decompress state and apply to gradients
for n, p in self.model.named_parameters():
new_grad = self.transformer.decode(
self.compressor.batch_decompress(
p.to(self.config.device),
step_grads.state_dict[n + 'idxs'],
step_grads.state_dict[n + 'vals'],
self.xshapes[n], self.totalks[n]
for n, p in self.model.named_parameters():
# Initialize an empty tensor for the aggregated gradient
aggregated_grad = torch.zeros_like(p, device=self.config.device)

# Sum gradients from all valid UIDs
for idx, uid in enumerate(step_grads.uids):
new_grad = self.transformer.decode(
self.compressor.decompress(
p.to(self.config.device),
step_grads.state_dict.__dict__[n + 'idxs'][idx],
step_grads.state_dict.__dict__[n + 'vals'][idx],
self.xshapes[n], self.totalks[n]
)
)
)
# Set recomputed gathered gradient

# Aggregate the gradients (e.g., sum or average)
# Here, we'll sum them up
aggregated_grad.add_(new_grad)

# Optionally average the gradient
aggregated_grad.div_(len(step_grads.uids))

# Set the aggregated gradient
if p.grad is None:
p.grad = new_grad
p.grad = aggregated_grad
else:
p.grad.copy_(new_grad)
p.grad.copy_(aggregated_grad)
p.grad.sign_()

# Apply the optimizer step
self.optimizer.step()
self.scheduler.step()

self.wandb.log({"lr": self.scheduler.get_last_lr()[0]}, step=self.global_step)

# Get a random peer to eval on their gradient at self.sync_window + 1
Expand Down
2 changes: 1 addition & 1 deletion src/tplr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# mypy: ignore-errors
# type: ignore

__version__ = "0.2.0"
__version__ = "0.2.1"

# Import package.
from .chain import *
Expand Down
4 changes: 2 additions & 2 deletions src/tplr/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(
netuid: Optional[int] = None,
metagraph=None,
hparams=None,
fetch_interval: int = 60, # Fetch interval in seconds
fetch_interval: int = 600, # Fetch interval in seconds
wallet: Optional["bt.wallet"] = None,
bucket: Optional[Bucket] = None,
):
Expand Down Expand Up @@ -346,7 +346,7 @@ async def get_commitments(self, block: Optional[int] = None) -> Dict[int, Bucket
secret_access_key=concatenated[64:],
)
commitments[uid] = bucket
logger.success(f"Retrieved bucket commitment for UID {uid}")
logger.debug(f"Retrieved bucket commitment for UID {uid}")

except Exception as e:
logger.error(f"Failed to decode commitment for UID {uid}: {e}")
Expand Down
113 changes: 67 additions & 46 deletions src/tplr/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async def put(

# Check file size
file_size = os.path.getsize(temp_file_path)
object_key = f"{uid}/{window}/{filename}"
object_key = filename

if file_size > 5 * 1024 * 1024 * 1024: # 5GB
# Use multipart upload for large files
Expand Down Expand Up @@ -336,54 +336,66 @@ async def get(
uid=uid, current_window=window, stale_retention=stale_retention
)

# Check file size first
# Get the peer's bucket from commitments
peer_bucket = self.commitments.get(int(uid))
if not peer_bucket:
tplr.logger.debug(f"No bucket found for UID {uid}")
return None

async with self.session.create_client(
"s3",
endpoint_url=self.get_base_url(self.bucket.account_id),
endpoint_url=self.get_base_url(peer_bucket.account_id),
region_name=CF_REGION_NAME,
config=client_config,
aws_access_key_id=self.bucket.access_key_id,
aws_secret_access_key=self.bucket.secret_access_key,
aws_access_key_id=peer_bucket.access_key_id,
aws_secret_access_key=peer_bucket.secret_access_key,
) as s3_client:
try:
response = await s3_client.head_object(
Bucket=self.bucket.name, Key=full_key
# Check if file exists first
await s3_client.head_object(
Bucket=peer_bucket.name, Key=filename
)
file_size = response["ContentLength"]
except (
botocore.exceptions.ClientError,
botocore.exceptions.BotoCoreError,
) as e:
tplr.logger.debug(f"Failed to get object metadata: {e}")
return None

# Create a temporary file for download
with tempfile.NamedTemporaryFile(
delete=False, suffix=".pt"
) as temp_file:
temp_file_path = temp_file.name

try:
if file_size > 5 * 1024 * 1024 * 1024: # 5GB
# Use multipart download for large files
success = await self.download_large_file(
full_key, temp_file_path
)
if not success:
raise Exception("Large file download failed")
else:
# Use regular download for smaller files
state_dict = await self.s3_get_object(full_key, timeout)
return state_dict

# Load the state dict from the temporary file
state_dict = torch.load(temp_file_path, weights_only=True)
return state_dict
error_code = e.response["Error"]["Code"]
if error_code == "404":
tplr.logger.debug(
f"Gradient not found for uid {uid} at window {window}. Skipping."
)
return None
else:
raise # Re-raise if it's a different exception

# Proceed to get the object if it exists
response = await asyncio.wait_for(
s3_client.get_object(Bucket=peer_bucket.name, Key=filename),
timeout=timeout,
)

finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
# Save to a temporary file and load
with tempfile.NamedTemporaryFile(
delete=True, suffix=".pt"
) as temp_file:
temp_file_path = temp_file.name
async with aiofiles.open(temp_file_path, "wb") as outfile:
while True:
chunk = await response["Body"].read(1 * 1024 * 1024)
if not chunk:
break
await outfile.write(chunk)

# Load the object
try:
with open(temp_file_path, "rb") as f:
state_dict = torch.load(f, weights_only=True)
return state_dict
except Exception as e:
tplr.logger.debug(
f"Error loading state_dict from {full_key}: {e}"
)
return None

except Exception as e:
tplr.logger.debug(f"GET error {full_key}: {e}")
Expand Down Expand Up @@ -473,39 +485,49 @@ async def gather(
# Initialize the aggregated state dict
aggregated_state_dict = {}
successes = []
valid_uids = []

# Process responses
responses = await asyncio.gather(*gather_tasks)
for idx, resp in enumerate(responses):
uid = uids[idx]
if resp is None:
successes.append(False)
continue

successes.append(True)
valid_uids.append(uid)

# Initialize aggregated_state_dict if empty
if not aggregated_state_dict:
aggregated_state_dict = {
param_name: [torch.zeros_like(tensor).to(device) for _ in uids]
for param_name, tensor in resp.items()
}
aggregated_state_dict = {param_name: [] for param_name in resp.keys()}

# Fill in data from this response
# Append tensors to aggregated_state_dict
for param_name, tensor in resp.items():
aggregated_state_dict[param_name][idx] = tensor.to(device)
aggregated_state_dict[param_name].append(tensor.to(device))
metrics["download_bytes"] += tensor.element_size() * tensor.nelement()

# Calculate success metrics
# Compute success rate
success_rate = sum(successes) / len(successes) if successes else 0
total_time = time.time() - start_time

# If no gradients were gathered, return an indicator
if not valid_uids:
tplr.logger.info("No gradients received from any UID.")
return None # or return an appropriate indicator

# For batch processing, ensure the lists are aligned
# with the same order for each parameter
aggregated_state_dict_namespace = SimpleNamespace(**aggregated_state_dict)

return SimpleNamespace(
time=total_time,
upload_bytes=metrics["upload_bytes"],
download_bytes=metrics["download_bytes"],
success_rate=success_rate,
successes=successes,
state_dict=aggregated_state_dict,
state_dict=aggregated_state_dict_namespace,
uids=valid_uids, # Include the list of UIDs that provided gradients
)

async def upload_large_file(self, file_path: str, filename: str) -> bool:
Expand Down Expand Up @@ -671,7 +693,6 @@ async def cleanup_old_checkpoints(self, keep_last: int = 3):
def get_highest_stake_validator(self) -> Tuple[Optional[int], float]:
"""Returns the UID and stake of the neuron with the highest stake."""
stakes = self.metagraph.S
tplr.logger.info(stakes)

# Convert numpy array to torch tensor if needed
if isinstance(stakes, np.ndarray):
Expand Down

0 comments on commit aadf916

Please sign in to comment.