From c11053cb83c0942a3f56fc3b1a6c26a7b09cc9a3 Mon Sep 17 00:00:00 2001 From: otariidae Date: Sun, 16 Oct 2022 20:32:03 +0900 Subject: [PATCH 1/2] fix to handle ConnectionClosedError --- .../videoCast/one_to_multiple_cast_skyway.py | 166 +++++++++--------- 1 file changed, 85 insertions(+), 81 deletions(-) diff --git a/frontend/videoCast/one_to_multiple_cast_skyway.py b/frontend/videoCast/one_to_multiple_cast_skyway.py index 330dee2f..967f2837 100644 --- a/frontend/videoCast/one_to_multiple_cast_skyway.py +++ b/frontend/videoCast/one_to_multiple_cast_skyway.py @@ -3,6 +3,7 @@ import asyncio import websockets +from websockets.exceptions import ConnectionClosedError import json import ssl import os @@ -42,46 +43,64 @@ async def handler(websocket, path): async with lock: connections.append(websocket) - async for message in websocket: # 受信 - dictionary = json.loads(message) - # msg_type, room_idで構成 - # connect_senderの時のみ+peer_id - promises = [] - msg_type = dictionary["msg_type"] - room_id = dictionary["room_id"] - - if room_id not in rooms: - room = { - "sender_socket": None, - "peer_id": None, - "skyway_room_id": None, - "connections": [websocket], - "connect_num": 0, - # ルームの累積接続数が1000行くと通信が弾かれるのでその前にルームを切り替え - } - async with lock: - rooms[room_id] = room - else: - room = rooms[room_id] - if websocket not in room["connections"]: + try: + async for message in websocket: # 受信 + dictionary = json.loads(message) + # msg_type, room_idで構成 + # connect_senderの時のみ+peer_id + promises = [] + msg_type = dictionary["msg_type"] + room_id = dictionary["room_id"] + + if room_id not in rooms: + room = { + "sender_socket": None, + "peer_id": None, + "skyway_room_id": None, + "connections": [websocket], + "connect_num": 0, + # ルームの累積接続数が1000行くと通信が弾かれるのでその前にルームを切り替え + } async with lock: - room["connections"].append(websocket) - # 現在の通信のwebsocketが入ったroom_idのroomが存在することを保証 - - if msg_type == "connect_sender": - if sender_token is None or sender_token == dictionary["sender_token"]: - async with lock: # if room["sender_socket"] is None: - print("sender_connect") - room["sender_socket"] = websocket - room["skyway_room_id"] = dictionary["skyway_room_id"] - room["connect_num"] = 0 - # senderは上書き - room["peer_id"] = dictionary["peer_id"] - for connection in room["connections"]: - if connection is websocket: - continue - print("send") - promise = connection.send( + rooms[room_id] = room + else: + room = rooms[room_id] + if websocket not in room["connections"]: + async with lock: + room["connections"].append(websocket) + # 現在の通信のwebsocketが入ったroom_idのroomが存在することを保証 + + if msg_type == "connect_sender": + if sender_token is None or sender_token == dictionary["sender_token"]: + async with lock: # if room["sender_socket"] is None: + print("sender_connect") + room["sender_socket"] = websocket + room["skyway_room_id"] = dictionary["skyway_room_id"] + room["connect_num"] = 0 + # senderは上書き + room["peer_id"] = dictionary["peer_id"] + for connection in room["connections"]: + if connection is websocket: + continue + print("send") + promise = connection.send( + json.dumps( + { + "msg_type": "connect_receiver", + "room_id": room_id, + "skyway_room_id": room["skyway_room_id"], + "peer_id": room["peer_id"], + } + ) + ) + room["connect_num"] += 1 + promises.append(promise) + elif msg_type == "connect_receiver": + print("connect_receiver") + if room["sender_socket"] is not None: + print("send") + if room["connect_num"] < MAX_CONNECT_NUM: + promise = websocket.send( json.dumps( { "msg_type": "connect_receiver", @@ -91,53 +110,38 @@ async def handler(websocket, path): } ) ) - room["connect_num"] += 1 + + async with lock: + room["connect_num"] += 1 promises.append(promise) - elif msg_type == "connect_receiver": - print("connect_receiver") - if room["sender_socket"] is not None: - print("send") - if room["connect_num"] < MAX_CONNECT_NUM: - promise = websocket.send( - json.dumps( - { - "msg_type": "connect_receiver", - "room_id": room_id, - "skyway_room_id": room["skyway_room_id"], - "peer_id": room["peer_id"], - } + else: + # ルームの累積接続数が溢れそうだったら新しい部屋をsenderに作ってもらう + promise = room["sender_socket"].send( + json.dumps( + {"msg_type": "request_reconnect_sender", "room_id": room_id} + ) ) - ) - + promises.append(promise) + async with lock: + room["connect_num"] = 0 + elif msg_type == "exit_room": + if websocket in room["connections"]: async with lock: - room["connect_num"] += 1 - promises.append(promise) - else: - # ルームの累積接続数が溢れそうだったら新しい部屋をsenderに作ってもらう - promise = room["sender_socket"].send( - json.dumps( - {"msg_type": "request_reconnect_sender", "room_id": room_id} - ) - ) - promises.append(promise) + room["connections"].remove(websocket) + if room["sender_socket"] is websocket: async with lock: + room["sender_socket"] = None + room["peer_id"] = None + room["skyway_room_id"] = None room["connect_num"] = 0 - elif msg_type == "exit_room": - if websocket in room["connections"]: - async with lock: - room["connections"].remove(websocket) - if room["sender_socket"] is websocket: - async with lock: - room["sender_socket"] = None - room["peer_id"] = None - room["skyway_room_id"] = None - room["connect_num"] = 0 - print("{}: {}".format(path, dictionary)) - for p in promises: - try: - a = await p - except: - pass + print("{}: {}".format(path, dictionary)) + for p in promises: + try: + a = await p + except: + pass + except ConnectionClosedError: + pass # 接続が切れたらその接続を削除 for room_id, room in rooms.items(): From 9afa0a0ad907a8276caf55b8134b22f4606e6b3c Mon Sep 17 00:00:00 2001 From: format BOT Date: Sun, 16 Oct 2022 11:36:11 +0000 Subject: [PATCH 2/2] format by black --- frontend/videoCast/one_to_multiple_cast_skyway.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/frontend/videoCast/one_to_multiple_cast_skyway.py b/frontend/videoCast/one_to_multiple_cast_skyway.py index 967f2837..f797051f 100644 --- a/frontend/videoCast/one_to_multiple_cast_skyway.py +++ b/frontend/videoCast/one_to_multiple_cast_skyway.py @@ -118,7 +118,10 @@ async def handler(websocket, path): # ルームの累積接続数が溢れそうだったら新しい部屋をsenderに作ってもらう promise = room["sender_socket"].send( json.dumps( - {"msg_type": "request_reconnect_sender", "room_id": room_id} + { + "msg_type": "request_reconnect_sender", + "room_id": room_id, + } ) ) promises.append(promise)