-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathshittyserver.py
481 lines (404 loc) · 16.2 KB
/
shittyserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
import asyncio
import configparser
import json
import os
import re
import sys
import uuid
from datetime import datetime, timezone
from types import TracebackType
from typing import Optional, Any, Type, Dict, List
import aiohttp
import av # type: ignore
import jack as Jack
from jack import OwnPort
import websockets.exceptions, websockets.server, websockets.connection
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription # type: ignore
from aiortc.mediastreams import MediaStreamError # type: ignore
import sentry_sdk
config = configparser.RawConfigParser()
config.read("serverconfig.ini")
if config.get("sentry", "enable") == "True":
sentry_sdk.init(config.get("sentry", "dsn"), traces_sample_rate=1.0)
file_contents_ex = re.compile(r"^ws=\d$")
def write_ob_status(status: bool) -> None:
if not os.path.exists("/music/ob_state.conf"):
print("OB State file does not exist. Bailing.")
return
with open("/music/ob_state.conf", "r+") as fd:
content = fd.read()
if "ws" in content:
content = re.sub(file_contents_ex, "ws=" + str(1 if status else 0), content)
else:
if len(content) > 0 and content[len(content) - 1] != "\n":
content += "\n"
content += "ws=" + str(1 if status else 0) + "\n"
fd.seek(0)
fd.write(content)
fd.truncate()
TurnCredentials = List[Dict[str, Any]]
def get_turn_credentials() -> TurnCredentials:
provider = config.get("shittyserver", "turn_provider")
if provider == "twilio":
from twilio.rest import Client # type: ignore
client = Client(
config.get("twilio", "account_sid"), config.get("twilio", "auth_token")
)
token = client.tokens.create()
# Twilio's typedef is wrong, reee
# noinspection PyTypeChecker
return token.ice_servers # type: ignore
elif provider == "hardcoded":
raise Exception("Don't use hardcoded anymore!")
else:
raise Exception("unknown provider " + provider)
@Jack.set_error_function
def error(msg: str) -> None:
print("Error:", msg)
@Jack.set_info_function
def info(msg: str) -> None:
print("Info:", msg)
jack = Jack.Client("webstudio")
out1: OwnPort = jack.outports.register("out_0") # type: ignore
out2: OwnPort = jack.outports.register("out_1") # type: ignore
transfer_buffer1: Any = None
transfer_buffer2: Any = None
def init_buffers() -> None:
global transfer_buffer1, transfer_buffer2
transfer_buffer1 = Jack.RingBuffer(jack.samplerate * 10)
transfer_buffer2 = Jack.RingBuffer(jack.samplerate * 10)
init_buffers()
@jack.set_process_callback
def process(frames: int) -> None:
buf1 = out1.get_buffer()
if transfer_buffer1.read_space == 0:
for i in range(len(buf1)):
buf1[i] = b"\x00" # type: ignore
else:
piece1 = transfer_buffer1.read(len(buf1))
buf1[: len(piece1)] = piece1
buf2 = out2.get_buffer()
if transfer_buffer2.read_space == 0:
for i in range(len(buf2)):
buf2[i] = b"\x00" # type: ignore
else:
piece2 = transfer_buffer2.read(len(buf2))
buf2[: len(piece2)] = piece2
active_sessions: Dict[str, "Session"] = {}
live_session: Optional["Session"] = None
async def notify_mattserver_about_sessions() -> None:
async with aiohttp.ClientSession() as session:
data: Dict[str, Dict[str, str]] = {}
for sid, sess in active_sessions.items():
data[sid] = sess.to_dict()
async with session.post(
config.get("shittyserver", "notify_url"), json=data
) as response:
print("Mattserver response", response)
class NotReadyException(BaseException):
pass
class Session(object):
websocket: Optional[websockets.server.WebSocketServerProtocol]
connection_state: Optional[str]
pc: Optional[Any]
connection_id: str
connected_at: datetime
lock: asyncio.Lock
running: bool
ended: bool
resampler: Optional[Any]
def __init__(self) -> None:
self.websocket = None
self.sender = None
self.pc = None
self.resampler = None
self.connection_state = None
self.connection_id = str(uuid.uuid4())
self.ended = False
self.lock = asyncio.Lock()
self.running = False
self.connected_at = datetime.now(timezone.utc)
def to_dict(self) -> Dict[str, str]:
return {
"connection_id": self.connection_id,
"connected_at": self.connected_at.strftime("%Y-%m-%dT%H:%M:%S%z"),
}
async def activate(self) -> None:
print(self.connection_id, "Activating")
self.running = True
if self.websocket is not None:
await self.websocket.send(json.dumps({"kind": "ACTIVATED"}))
async def deactivate(self) -> None:
print(self.connection_id, "Deactivating")
self.running = False
if self.websocket is not None:
try:
await self.websocket.send(json.dumps({"kind": "DEACTIVATED"}))
except websockets.exceptions.ConnectionClosed:
print(
self.connection_id, "not sending DEACTIVATED as it's already closed"
)
pass
async def end(self) -> None:
global active_sessions, live_session
async with self.lock:
if self.ended:
print(self.connection_id, "already over")
else:
print(self.connection_id, "going away")
self.ended = True
await self.deactivate()
if self.pc is not None:
await self.pc.close()
if (
self.websocket is not None
and self.websocket.state == websockets.connection.State.OPEN
):
try:
await self.websocket.send(json.dumps({"kind": "DIED"}))
await self.websocket.close(1008)
except websockets.exceptions.ConnectionClosed:
print(
self.connection_id, "socket already closed, no died message"
)
if self.connection_id in active_sessions:
print(self.connection_id, "removing from active_sessions")
del active_sessions[self.connection_id]
print("active_sessions now")
print(active_sessions)
if len(active_sessions) == 0:
write_ob_status(False)
else:
print(self.connection_id, "wasn't in active_sessions!")
if live_session == self:
live_session = None
await notify_mattserver_about_sessions()
print(self.connection_id, "bye bye")
self.ended = True
def create_peerconnection(self) -> None:
self.pc = RTCPeerConnection()
assert self.pc is not None
@self.pc.on("signalingstatechange") # type: ignore
async def on_signalingstatechange() -> None:
assert self.pc is not None
print(
self.connection_id,
"Signaling state is {}".format(self.pc.signalingState),
)
@self.pc.on("iceconnectionstatechange") # type: ignore
async def on_iceconnectionstatechange() -> None:
if self.pc is None:
print(
self.connection_id,
"ICE connection state change, but the PC is None!",
)
else:
print(
self.connection_id,
"ICE connection state is {}".format(self.pc.iceConnectionState),
)
if self.pc.iceConnectionState == "failed":
print(self.connection_id, "Ending due to ICE connection failure")
await self.end()
@self.pc.on("track") # type: ignore
async def on_track(track: MediaStreamTrack) -> None:
global live_session, transfer_buffer1, transfer_buffer2
print(self.connection_id, "Received track")
if track.kind == "audio":
print(self.connection_id, "It's audio.")
await notify_mattserver_about_sessions()
@track.on("ended") # type: ignore
async def on_ended() -> None:
print(
self.connection_id,
"Ending due to {} track end".format(track.kind),
)
await self.end()
write_ob_status(True)
while True:
try:
frame = await track.recv()
except MediaStreamError as e:
print(self.connection_id, "Ending due to MediaStreamError")
print(e)
await self.end()
break
if self.running:
# Right, depending on the format, we may need to do some fuckery.
# Jack expects all audio to be 32 bit floating point
# while PyAV may give us audio in any format
# (my testing has shown it to be signed 16-bit)
# We use PyAV to resample it into the right format
if self.resampler is None:
self.resampler = av.audio.resampler.AudioResampler(
format="fltp", layout="stereo", rate=jack.samplerate
)
frame.pts = None # DIRTY HACK
new_frame = self.resampler.resample(frame)
transfer_buffer1.write(new_frame.planes[0])
transfer_buffer2.write(new_frame.planes[1])
async def process_ice(self, message: Any) -> None:
if self.connection_state == "HELLO" and message["kind"] == "OFFER":
offer = RTCSessionDescription(sdp=message["sdp"], type=message["type"])
print(self.connection_id, "Received offer")
self.create_peerconnection()
assert self.pc is not None
await self.pc.setRemoteDescription(offer)
answer = await self.pc.createAnswer()
answer.sdp += "\na=fmtp:111 minptime=10;useinbandfec=0;maxaveragebitrate=262144;stereo=1;sprop-stereo=1;cbr=1"
await self.pc.setLocalDescription(answer)
assert self.websocket is not None
await self.websocket.send(
json.dumps(
{
"kind": "ANSWER",
"type": self.pc.localDescription.type,
"sdp": self.pc.localDescription.sdp,
}
)
)
self.connection_state = "ANSWER"
print(self.connection_id, "Sent answer")
else:
print(
self.connection_state,
"Incorrect kind {} for state {}".format(
message["kind"], self.connection_state
),
)
async def connect(
self, websocket: websockets.server.WebSocketServerProtocol
) -> None:
global active_sessions
active_sessions[self.connection_id] = self
self.websocket = websocket
self.connection_state = "HELLO"
print(self.connection_id, "Connected")
ice_config = get_turn_credentials()
print(self.connection_id, "Obtained ICE")
sentry_sdk.set_context("session", {"session_id": self.connection_id})
await websocket.send(
json.dumps(
{
"kind": "HELLO",
"connectionId": self.connection_id,
"iceServers": ice_config,
}
)
)
try:
async for msg in websocket:
data = json.loads(msg)
if data["kind"] == "OFFER":
await self.process_ice(data)
elif data["kind"] == "TIME":
time = datetime.now().time()
await websocket.send(
json.dumps({"kind": "TIME", "time": str(time)})
)
else:
print(self.connection_id, "Unknown kind {}".format(data["kind"]))
await websocket.send(
json.dumps({"kind": "ERROR", "error": "unknown_kind"})
)
except websockets.exceptions.ConnectionClosed:
print(self.connection_id, "Ending due to dead WebSocket")
await self.end()
async def serve(
websocket: websockets.server.WebSocketServerProtocol, path: str
) -> None:
if path == "/stream":
session = Session()
await session.connect(websocket)
else:
pass
start_server = websockets.server.serve(
serve, host=None, port=int(config.get("shittyserver", "websocket_port"))
)
print(
"Shittyserver WS starting on port {}.".format(
config.get("shittyserver", "websocket_port")
)
)
async def telnet_server(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
global active_sessions, live_session
while True:
data = await reader.readline()
if not data:
break
try:
data_str = data.decode("utf-8")
except UnicodeDecodeError as e:
print(e)
continue
parts = data_str.rstrip().split(" ")
print(parts)
if parts[0] == "Q":
result: Dict[str, Dict[str, str]] = {}
for sid, sess in active_sessions.items():
result[sid] = sess.to_dict()
writer.write(
(
json.dumps(
{
"live": live_session.to_dict()
if live_session is not None
else None,
"active": result,
}
)
+ "\r\n"
).encode("utf-8")
)
elif parts[0] == "SEL":
sid = parts[1]
if sid == "NUL":
if live_session is not None:
await live_session.deactivate()
live_session = None
print("OKAY")
writer.write("OKAY\r\n".encode("utf-8"))
else:
print("WONT no_live_session")
writer.write("WONT no_live_session\r\n".encode("utf-8"))
else:
if sid not in active_sessions:
print("WONT no_such_session")
writer.write("WONT no_such_session\r\n".encode("utf-8"))
else:
session = active_sessions[sid]
if session is None:
print("OOPS no_such_session")
writer.write("OOPS no_such_session\r\n".encode("utf-8"))
elif live_session is not None and live_session.connection_id == sid:
print("WONT already_live")
writer.write("WONT already_live\r\n".encode("utf-8"))
else:
if live_session is not None:
await live_session.deactivate()
await session.activate()
live_session = session
print("OKAY")
writer.write("OKAY\r\n".encode("utf-8"))
else:
writer.write("WHAT\r\n".encode("utf-8"))
await writer.drain()
writer.close()
async def run_telnet_server() -> None:
server = await asyncio.start_server(
telnet_server, "localhost", int(config.get("shittyserver", "telnet_port"))
)
await server.serve_forever()
jack.activate()
print(
"Shittyserver TELNET starting on port {}".format(
config.get("shittyserver", "telnet_port")
)
)
asyncio.get_event_loop().run_until_complete(notify_mattserver_about_sessions())
asyncio.get_event_loop().run_until_complete(
asyncio.gather(start_server, run_telnet_server())
)
asyncio.get_event_loop().run_forever()