Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix to handle ConnectionClosedError #140

Merged
merged 3 commits into from
Oct 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 88 additions & 81 deletions frontend/videoCast/one_to_multiple_cast_skyway.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import asyncio
import websockets
from websockets.exceptions import ConnectionClosedError
import json
import ssl
import os
Expand Down Expand Up @@ -39,46 +40,64 @@ async def handler(websocket, path):
async with lock:
connections.append(websocket)

async for message in websocket: # 受信
dictionary = json.loads(message)
# msg_type, room_idで構成
# connect_senderの時のみ+peer_id
promises = []
msg_type = dictionary["msg_type"]
room_id = dictionary["room_id"]

if room_id not in rooms:
room = {
"sender_socket": None,
"peer_id": None,
"skyway_room_id": None,
"connections": [websocket],
"connect_num": 0,
# ルームの累積接続数が1000行くと通信が弾かれるのでその前にルームを切り替え
}
async with lock:
rooms[room_id] = room
else:
room = rooms[room_id]
if websocket not in room["connections"]:
try:
async for message in websocket: # 受信
dictionary = json.loads(message)
# msg_type, room_idで構成
# connect_senderの時のみ+peer_id
promises = []
msg_type = dictionary["msg_type"]
room_id = dictionary["room_id"]

if room_id not in rooms:
room = {
"sender_socket": None,
"peer_id": None,
"skyway_room_id": None,
"connections": [websocket],
"connect_num": 0,
# ルームの累積接続数が1000行くと通信が弾かれるのでその前にルームを切り替え
}
async with lock:
room["connections"].append(websocket)
# 現在の通信のwebsocketが入ったroom_idのroomが存在することを保証

if msg_type == "connect_sender":
if SENDER_TOKEN is None or SENDER_TOKEN == dictionary["sender_token"]:
async with lock: # if room["sender_socket"] is None:
print("sender_connect")
room["sender_socket"] = websocket
room["skyway_room_id"] = dictionary["skyway_room_id"]
room["connect_num"] = 0
# senderは上書き
room["peer_id"] = dictionary["peer_id"]
for connection in room["connections"]:
if connection is websocket:
continue
print("send")
promise = connection.send(
rooms[room_id] = room
else:
room = rooms[room_id]
if websocket not in room["connections"]:
async with lock:
room["connections"].append(websocket)
# 現在の通信のwebsocketが入ったroom_idのroomが存在することを保証

if msg_type == "connect_sender":
if SENDER_TOKEN is None or SENDER_TOKEN == dictionary["sender_token"]:
async with lock: # if room["sender_socket"] is None:
print("sender_connect")
room["sender_socket"] = websocket
room["skyway_room_id"] = dictionary["skyway_room_id"]
room["connect_num"] = 0
# senderは上書き
room["peer_id"] = dictionary["peer_id"]
for connection in room["connections"]:
if connection is websocket:
continue
print("send")
promise = connection.send(
json.dumps(
{
"msg_type": "connect_receiver",
"room_id": room_id,
"skyway_room_id": room["skyway_room_id"],
"peer_id": room["peer_id"],
}
)
)
room["connect_num"] += 1
promises.append(promise)
elif msg_type == "connect_receiver":
print("connect_receiver")
if room["sender_socket"] is not None:
print("send")
if room["connect_num"] < MAX_CONNECT_NUM:
promise = websocket.send(
json.dumps(
{
"msg_type": "connect_receiver",
Expand All @@ -88,53 +107,41 @@ async def handler(websocket, path):
}
)
)
room["connect_num"] += 1

async with lock:
room["connect_num"] += 1
promises.append(promise)
elif msg_type == "connect_receiver":
print("connect_receiver")
if room["sender_socket"] is not None:
print("send")
if room["connect_num"] < MAX_CONNECT_NUM:
promise = websocket.send(
json.dumps(
{
"msg_type": "connect_receiver",
"room_id": room_id,
"skyway_room_id": room["skyway_room_id"],
"peer_id": room["peer_id"],
}
else:
# ルームの累積接続数が溢れそうだったら新しい部屋をsenderに作ってもらう
promise = room["sender_socket"].send(
json.dumps(
{
"msg_type": "request_reconnect_sender",
"room_id": room_id,
}
)
)
)

promises.append(promise)
async with lock:
room["connect_num"] = 0
elif msg_type == "exit_room":
if websocket in room["connections"]:
async with lock:
room["connect_num"] += 1
promises.append(promise)
else:
# ルームの累積接続数が溢れそうだったら新しい部屋をsenderに作ってもらう
promise = room["sender_socket"].send(
json.dumps(
{"msg_type": "request_reconnect_sender", "room_id": room_id}
)
)
promises.append(promise)
room["connections"].remove(websocket)
if room["sender_socket"] is websocket:
async with lock:
room["sender_socket"] = None
room["peer_id"] = None
room["skyway_room_id"] = None
room["connect_num"] = 0
elif msg_type == "exit_room":
if websocket in room["connections"]:
async with lock:
room["connections"].remove(websocket)
if room["sender_socket"] is websocket:
async with lock:
room["sender_socket"] = None
room["peer_id"] = None
room["skyway_room_id"] = None
room["connect_num"] = 0
print("{}: {}".format(path, dictionary))
for p in promises:
try:
a = await p
except:
pass
print("{}: {}".format(path, dictionary))
for p in promises:
try:
a = await p
otariidae marked this conversation as resolved.
Show resolved Hide resolved
except:
pass
except ConnectionClosedError:
pass

# 接続が切れたらその接続を削除
for room_id, room in rooms.items():
Expand Down