From 2c81e7a0d6f22c3574186651cd1e4a486ed41d81 Mon Sep 17 00:00:00 2001 From: back-to Date: Wed, 23 Dec 2020 18:53:08 +0100 Subject: [PATCH] server: use subprocess.Popen, add Youtube-DL support Streamlink is not required anymore, Youtube-DL can now be used instead. closes https://github.com/back-to/liveproxy/issues/16 --- liveproxy/server.py | 93 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 83 insertions(+), 10 deletions(-) diff --git a/liveproxy/server.py b/liveproxy/server.py index e2cc9ad..02bedff 100644 --- a/liveproxy/server.py +++ b/liveproxy/server.py @@ -2,10 +2,16 @@ import errno import logging import os +import re import shlex import socket +import subprocess +import sys from http.server import BaseHTTPRequestHandler, HTTPServer +from select import select +from shutil import which from socketserver import ThreadingMixIn +from time import time from urllib.parse import parse_qsl, unquote, urlparse ACCEPTABLE_ERRNO = ( @@ -19,13 +25,15 @@ except AttributeError: pass # Not windows +_re_streamlink = re.compile(r'streamlink(?:\.exe)?$') +_re_youtube_dl = re.compile(r'youtube[_-]dl(?:\.exe)?$') log = logging.getLogger(__name__.replace('liveproxy.', '')) -def arglist_from_query(path): +def arglist_from_query(path, prog='streamlink'): old_data = parse_qsl(urlparse(path).query) - arglist = [] + arglist = [prog] for k, v in old_data: if k == 'q': # backwards compatibility --q @@ -54,21 +62,84 @@ def do_HEAD(self): def do_GET(self): '''Respond to a GET request.''' + random_id = hex(int(time()))[5:] + log = logging.getLogger(f'liveproxy.{random_id}.{__name__}') + if self.path.startswith(('/play/', '/streamlink/', '/301/', '/streamlink_301/')): # http://127.0.0.1:53422/play/?url=https://foo.bar&q=worst arglist = arglist_from_query(self.path) - main_play(self, arglist) elif self.path.startswith(('/base64/')): # http://127.0.0.1:53422/base64/STREAMLINK-COMMANDS/ - base64_path = self.path[8:] - if base64_path.endswith('/'): - base64_path = base64_path[:-1] - arglist = shlex.split(base64.urlsafe_b64decode(base64_path).decode('UTF-8')) - if arglist[0].lower() == 'streamlink': - arglist = arglist[1:] - main_play(self, arglist) + # http://127.0.0.1:53422/base64/YOUTUBE-DL-COMMANDS/ + try: + arglist = shlex.split(base64.urlsafe_b64decode(self.path.split('/')[2]).decode('UTF-8')) + except base64.binascii.Error as err: + log.error(f'invalid base64 URL: {err}') + self._headers(404, 'text/html', connection='close') + return else: self._headers(404, 'text/html', connection='close') + return + + log.info(f'User-Agent: {self.headers.get("User-Agent", "???")}') + log.info(f'Client: {self.client_address}') + log.info(f'Address: {self.address_string()}') + + prog = which(arglist[0], mode=os.F_OK | os.X_OK) + if not prog: + log.debug(f'invalid prog: {prog}') + return + + log.debug(f'prog: {prog}') + if _re_streamlink.search(prog): + arglist.extend(['--stdout', '--loglevel', 'none']) + elif _re_youtube_dl.search(prog): + arglist.extend(['--o', '-', '--quiet', '--no-playlist', '--no-warnings', '--no-progress']) + else: + log.error('currently unsupported programm') + return + + self._headers(200, 'video/unknown') + process = subprocess.Popen(arglist, + stderr=subprocess.PIPE, + stdin=None, + stdout=subprocess.PIPE, + shell=False, + ) + + log.info(f'Stream started {random_id}') + try: + while True: + reads, _, _ = select([process.stdout.fileno(), process.stderr.fileno()], [], []) + for descriptor in reads: + # stdout + if descriptor == process.stdout.fileno(): + read = process.stdout.readline() + if read: + self.wfile.write(read) + # stderr + if descriptor == process.stderr.fileno(): + read = process.stderr.readline() + if read: + log.debug(f'{read[0:-1].decode("utf-8")}') + sys.stdout.flush() + if process.poll() is not None: + self.wfile.close() + break + except socket.error as e: + if isinstance(e.args, tuple): + if e.errno == errno.EPIPE: + # remote peer disconnected + log.info('Detected remote disconnect') + else: + log.error(f'E1: {e!r}') + else: + log.error(f'E2: {e!r}') + + log.info(f'Stream ended {random_id}') + process.terminate() + process.wait() + process.kill() class Server(HTTPServer): @@ -79,6 +150,8 @@ def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" try: self.RequestHandlerClass(request, client_address, self) + except ValueError: + pass except socket.error as err: if err.errno not in ACCEPTABLE_ERRNO: raise