-
Notifications
You must be signed in to change notification settings - Fork 9
/
webinar.py
executable file
·336 lines (237 loc) · 9.17 KB
/
webinar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
#! /usr/bin/env python3
import json
import logging
import re
import shutil
import subprocess
from functools import partial
from pathlib import Path
from random import choice
from time import sleep
import requests
from requests import Session
LOGGER = logging.getLogger(__name__)
def configure_logging(
*,
level: int = None,
logger: logging.Logger = None,
fmt: str = '%(message)s'
):
"""Switches on logging at a given level. For a given logger or globally.
:param level:
:param logger:
:param fmt:
"""
logging.basicConfig(format=fmt, level=level if logger else None)
logger and logger.setLevel(level or logging.INFO)
configure_logging(logger=LOGGER)
def get_user_input(prompt: str, *, choices: list[str] = None) -> str:
choices = set(choices or [])
while True:
data = input(f'{prompt}: ')
data = data.strip()
if not data or (choices and data not in choices):
continue
return data
class Dumper:
title: str = ''
_user_input_map: dict[str, str] = None
_headers: dict[str, str] = {
'Connection': 'keep-alive',
'Accept': '*/*',
'User-Agent': (
'Mozilla/5.0 (X11; Linux x86_64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/79.0.3945.136 YaBrowser/20.2.3.320 (beta) Yowser/2.5 Safari/537.36'
),
'Sec-Fetch-Site': 'same-site',
'Sec-Fetch-Mode': 'cors',
'Accept-Language': 'ru,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, sdch, br',
}
registry = []
def __init_subclass__(cls):
super().__init_subclass__()
cls.registry.append(cls)
def __init__(self):
self._user_input_map = self._user_input_map or {}
self._session = self._get_session()
self._sleepy = True
def __str__(self):
return self.title
def _get_session(self) -> Session:
session = requests.Session()
session.headers = self._headers
return session
def _get_args(self) -> dict:
input_data = {}
for param, hint in self._user_input_map.items():
input_data[param] = get_user_input(hint)
return input_data
def _chunks_get_list(self, url: str) -> list[str]:
"""Get video chunks names from playlist file at URL.
:param url: File URL.
"""
LOGGER.info(f'Getting video chunks from playlist {url} ...')
playlist = self._get_response_simple(url)
chunk_lists = []
for line in playlist.splitlines():
line = line.strip()
if not line.partition('?')[0].endswith('.ts'):
continue
chunk_lists.append(line)
assert chunk_lists, 'No .ts chunks found in playlist file'
return chunk_lists
def _chunks_download(
self,
*,
url_video_root: str,
dump_dir: Path,
chunk_names: list[str],
start_chunk: str,
headers: dict[str, str] = None,
) -> None:
chunks_total = len(chunk_names)
for idx, chunk_name in enumerate(chunk_names, 1):
if chunk_name == start_chunk:
start_chunk = '' # clear to allow further download
if start_chunk:
continue
percent = round(idx * 100 / chunks_total, 1)
LOGGER.info(f'Get {idx}/{chunks_total} ({chunk_name}) [{percent}%] ...')
chunk_url = f'{url_video_root.rstrip("/")}/{chunk_name}'
with self._session.get(chunk_url, headers=headers or {}, stream=True) as r:
r.raise_for_status()
with open(dump_dir / chunk_name.partition('?')[0], 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if self._sleepy:
sleep(choice([1, 0.5, 0.7, 0.6]))
def _video_concat(self, path: Path) -> Path:
LOGGER.info('Concatenating video ...')
fname_video = 'all_chunks.mp4'
fname_index = 'all_chunks.txt'
call = partial(subprocess.check_call, cwd=path, shell=True)
call(f'for i in `ls *.ts | sort -V`; do echo "file $i"; done >> {fname_index}')
call(f'ffmpeg -f concat -i {fname_index} -c copy -bsf:a aac_adtstoasc {fname_video}')
return path / fname_video
def _get_response_simple(self, url: str, *, json: bool = False) -> str | dict:
"""Returns a text or a dictionary from a URL.
:param url:
:param json:
"""
response = self._session.get(url)
response.raise_for_status()
if json:
return response.json()
return response.text
def _video_dump(
self,
*,
title: str,
url_playlist: str,
url_referer: str,
start_chunk: str = '',
):
assert url_playlist.endswith('m3u8'), f'No playlist in `{url_playlist}`'
LOGGER.info(f'Title: {title}')
chunk_names = self._chunks_get_list(url_playlist)
LOGGER.info('Downloading video ...')
dump_dir = Path(title).absolute()
dump_dir.mkdir(exist_ok=True)
url_root = url_playlist.rpartition('/')[0] # strip playlist filename
self._chunks_download(
url_video_root=url_root,
dump_dir=dump_dir,
chunk_names=chunk_names,
start_chunk=start_chunk,
headers={'Referer': url_referer}
)
fpath_video_target = Path(f'{title}.mp4').absolute()
fpath_video = self._video_concat(dump_dir)
shutil.move(fpath_video, fpath_video_target)
shutil.rmtree(dump_dir, ignore_errors=True)
LOGGER.info(f'Video is ready: {fpath_video_target}')
def _gather(self, *, url_video: str, start_chunk: str = '', **params):
raise NotImplementedError
def run(self):
self._gather(**self._get_args())
class WebinarRu(Dumper):
title = 'webinar.ru'
_user_input_map = {
'url_video': 'Video URL (with `record-new/`)',
'url_playlist': 'Video chunk list URL (with `chunklist.m3u8`)',
}
_headers = {
**Dumper._headers,
'Origin': 'https://events.webinar.ru',
}
def _gather(self, *, url_video: str, start_chunk: str = '', url_playlist: str = '', **params):
"""Runs video dump.
:param url_video: Video URL. Hint: has record-new/
:param url_playlist: Video chunk list URL. Hint: ends with chunklist.m3u8
:param start_chunk: Optional chunk name to continue download from.
"""
assert url_playlist, 'Playlist URL must be specified'
assert 'record-new/' in url_video, (
'Unexpected video URL format\n'
f'Given: {url_video}.\n'
f'Expected: https://events.webinar.ru/xxx/yyy/record-new/aaa/bbb')
_, _, tail = url_video.partition('record-new/')
session_id, _, video_id = tail.partition('/')
LOGGER.info('Getting manifest ...')
manifest = self._get_response_simple(
f'https://events.webinar.ru/api/eventsessions/{session_id}/record/isviewable?recordAccessToken={video_id}',
json=True
)
self._video_dump(
title=manifest['name'],
url_playlist=url_playlist,
url_referer=url_video,
start_chunk=start_chunk,
)
class YandexDisk(Dumper):
title = 'Яндекс.Диск'
_user_input_map = {
'url_video': 'Video URL (https://disk.yandex.ru/i/xxx)',
}
def _get_manifest(self, url: str) -> dict:
LOGGER.debug(f'Getting manifest from {url} ...')
contents = self._get_response_simple(url)
manifest = re.findall(r'id="store-prefetch">([^<]+)</script', contents)
assert manifest, f'Manifest not found for {url}'
manifest = manifest[0]
manifest = json.loads(manifest)
return manifest
def _get_playlist_and_title(self, manifest: dict) -> tuple[str, str]:
resources = list(manifest['resources'].values())
resource = resources[0]
dimension_max = 0
url_playlist = '<none>'
for stream_info in resource['videoStreams']['videos']:
dimension, *_ = stream_info['dimension'].partition('p')
if not dimension.isnumeric():
continue # e.g. 'adaptive'
dimension = int(dimension)
if dimension_max < dimension:
dimension_max = dimension
url_playlist = stream_info['url']
return url_playlist, resource['name']
def _gather(self, *, url_video: str, start_chunk: str = '', **params):
manifest = self._get_manifest(url_video)
url_playlist, title = self._get_playlist_and_title(manifest)
self._video_dump(
title=title,
url_playlist=url_playlist,
url_referer=url_video,
start_chunk=start_chunk,
)
if __name__ == '__main__':
dumper_choices = []
print('Available dumpers:')
for idx, dumper in enumerate(Dumper.registry, 1):
print(f'{idx} — {dumper.title}')
dumper_choices.append(f'{idx}')
chosen = get_user_input('Select dumper number', choices=dumper_choices)
dumper = Dumper.registry[int(chosen)-1]()
dumper.run()