Skip to content

Commit

Permalink
Project Name: BattleBot
Browse files Browse the repository at this point in the history
Commit Type: Feature

Added a ping timer, Autorerestart and more

Changelog
- Added a ping so the ping time can be seen every 10 seconds
- Changed the way data is send on the client to the server
- Added auto restart on connection lost on the server side
- Restructerd how the websocket connection is made on the server

Signed-off-by: ButrosGroot <[email protected]>
  • Loading branch information
butros10games committed Feb 18, 2024
1 parent 7e6c6cd commit ba8bf20
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 21 deletions.
37 changes: 30 additions & 7 deletions src/client/communications.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import websockets
import json
import asyncio
from time import perf_counter

from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate

class WebSocketClient:
Expand Down Expand Up @@ -34,12 +37,32 @@ async def connect(self):
self.ws = ws
await self.setup_data_channel()
await self.create_and_send_offer()
self.ping_task = asyncio.create_task(self.ping_timer())
await self.receive_messages()

async def setup_data_channel(self):
self.data_channel = self.pc.createDataChannel("dataChannel")
self.data_channel.on("open", lambda: print("Data Channel is open"))
self.data_channel.on("message", lambda message: print(f"Message from Data Channel: {message}"))
self.data_channel.on("message", self.on_data_channel_message)

async def on_data_channel_message(self, message):
message = json.loads(message)

if "pong" in message:
current_time = perf_counter()
pong_time = message["pong"]

ping_time = (current_time - pong_time) * 1000 # convert to milliseconds

print(f"pong received. {ping_time} ms")
else:
print(f"Message from Data Channel: {message}")

async def ping_timer(self):
while True:
await asyncio.sleep(10)
current_time = perf_counter()
await self.send_command({"ping": current_time})

async def create_and_send_offer(self):
offer = await self.pc.createOffer()
Expand All @@ -55,8 +78,6 @@ async def receive_messages(self):
await self.handle_new_ice_candidate(data)

async def handle_answer(self, data):
print(f"Received answer: {data}")

message_type = data["type"]
if message_type == "answer":
answer = RTCSessionDescription(sdp=data["sdp"], type=message_type)
Expand All @@ -70,16 +91,18 @@ async def handle_new_ice_candidate(self, data):
candidate = RTCIceCandidate(sdpMLineIndex=data["sdpMLineIndex"], candidate=data["candidate"])
await self.pc.addIceCandidate(candidate)

async def send_command(self, action, value):
async def send_command(self, command):
if hasattr(self, 'data_channel') and self.data_channel.readyState == "open":
command = json.dumps({"action": action, "value": value})
try:
self.data_channel.send(command)
self.data_channel.send(json.dumps(command))
except Exception as e:
print(f"Error sending message: {e}")
print(f"Error sending message: {e}, traceback: {e.__traceback__}")
else:
print("Data channel is not open or not set up yet.")

async def close(self):
if self.ping_task:
self.ping_task.cancel()

await self.pc.close()
await self.ws.close()
2 changes: 1 addition & 1 deletion src/client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@ async def state_change(self, state):
}
action, value = action_map.get(state, ("stop", 0))

await self.net_client.send_command(action, value)
await self.net_client.send_command({"action": action, "value": value})
45 changes: 32 additions & 13 deletions src/server/communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,33 @@ def __init__(self, motor_controller, battlebot_name):
self.motor_controller = motor_controller
self.battlebot_name = battlebot_name
self.pc = RTCPeerConnection()
self.websocket = None

async def on_ice_connection_state_change(self, event=None):
print(f"ICE connection state is {self.pc.iceConnectionState}")
if self.pc.iceConnectionState in ["failed", "disconnected", "closed"]:
print("ICE connection lost, setting up for reconnect...")
self.pc = RTCPeerConnection()

async def connect_to_signal_server(self):
self.ws_url = f"wss://butrosgroot.com/ws/battle_bot/signal/{self.battlebot_name}/"
print(f"Connecting to signaling server at {self.ws_url}")
async with websockets.connect(self.ws_url) as websocket:
print("Connected to the signaling server")
await self.handle_signaling(websocket)

async def handle_signaling(self, websocket):
self.websocket = await websockets.connect(self.ws_url)
print("Connected to signaling server.")

async def handle_signaling(self):
try:
async for message in websocket:
async for message in self.websocket:
data = json.loads(message)

if "sdp" in data:
await self.handle_sdp(data, websocket)
await self.handle_sdp(data)
elif "ice" in data:
await self.handle_ice(data)
except Exception as e:
print(f"Error in handle_signaling: {e}")

async def handle_sdp(self, data, websocket):
async def handle_sdp(self, data):
description = RTCSessionDescription(sdp=data["sdp"], type=data["type"])
# Check signaling state before setting remote description
if self.pc.signalingState == "stable" and description.type == "answer":
Expand All @@ -84,7 +90,7 @@ async def handle_sdp(self, data, websocket):

if description.type == "offer":
await self.pc.setLocalDescription(await self.pc.createAnswer())
await websocket.send(json.dumps({"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}))
await self.websocket.send(json.dumps({"sdp": self.pc.localDescription.sdp, "type": self.pc.localDescription.type}))
self.pc.on("datachannel", self.on_data_channel)
elif description.type == "answer":
print("Unexpected answer received, ignoring.")
Expand All @@ -96,23 +102,36 @@ async def handle_ice(self, data):
await self.pc.addIceCandidate(candidate)

async def on_data_channel(self, event):
data_channel = event
data_channel.on("open", self.on_data_channel_open)
data_channel.on("message", self.on_data_channel_message)
self.data_channel = event
self.data_channel.on("open", self.on_data_channel_open)
self.data_channel.on("message", self.on_data_channel_message)
self.pc.on("iceconnectionstatechange", self.on_ice_connection_state_change)

async def on_data_channel_open(self):
print("Data Channel is open")

async def on_data_channel_message(self, message):
data = json.loads(message)
print(f"Received data: {data}")
if 'ping' in data:
await self.send_data({'pong': data['ping']})
if 'action' in data and 'value' in data:
print(f"Received data: {data}")
self.motor_controller.action(data['action'], data['value'])

async def send_data(self, message):
if hasattr(self, 'data_channel') and self.data_channel.readyState == "open":
try:
self.data_channel.send(json.dumps(message))
except Exception as e:
print(f"Error sending message: {e}")
else:
print("Data channel is not open or not set up yet.")

async def run(self):
while True:
try:
await self.connect_to_signal_server()
await self.handle_signaling()
except Exception as e:
print(f"Connection lost, error: {e}. Retrying...")
await asyncio.sleep(5)

0 comments on commit ba8bf20

Please sign in to comment.