diff --git a/resources/lib/youtube_plugin/youtube/client/youtube.py b/resources/lib/youtube_plugin/youtube/client/youtube.py index 2abb72a36..3de53a9e6 100644 --- a/resources/lib/youtube_plugin/youtube/client/youtube.py +++ b/resources/lib/youtube_plugin/youtube/client/youtube.py @@ -26,6 +26,7 @@ datetime_parser, strip_html_from_text, to_unicode, + wait, ) @@ -1514,55 +1515,19 @@ def get_my_subscriptions(self, 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'en-US,en;q=0.7,de;q=0.3' } - namespaces = { - 'atom': 'http://www.w3.org/2005/Atom', - 'yt': 'http://www.youtube.com/xml/schemas/2015', - 'media': 'http://search.yahoo.com/mrss/', - } - def _feed_items(channel_id, - encode=not current_system_version.compatible(19, 0), - headers=headers, - ns=namespaces): - response = self.request( + def _get_feed(channel_id, headers=headers): + return channel_id, self.request( 'https://www.youtube.com/feeds/videos.xml?channel_id=' + channel_id, headers=headers, ) - if not response: - return None - - response.encoding = 'utf-8' - xml_data = to_unicode(response.content) - xml_data = xml_data.replace('\n', '') - if encode: - xml_data = to_str(xml_data) - - root = ET.fromstring(xml_data) - channel_name = (root.findtext('atom:title', '', ns) - .lower().replace(',', '')) - return [{ - 'kind': 'youtube#video', - 'id': item.findtext('yt:videoId', '', ns), - 'snippet': { - 'title': item.findtext('atom:title', '', ns), - 'channelId': channel_id, - }, - '_channel': channel_name, - '_timestamp': datetime_parser.since_epoch( - datetime_parser.strptime( - item.findtext('atom:published', '', ns) - ) - ), - '_partial': True, - } for item in root.findall('atom:entry', ns)] def _threaded_fetch(args, kwargs, output, worker, - input_lock, - output_lock): + input_lock): while 1: if input_lock.acquire(blocking=False): _args = args.pop() if args else [] @@ -1571,34 +1536,31 @@ def _threaded_fetch(args, break input_lock.release() else: + wait(0.1) continue try: - _output = worker(*_args, **_kwargs) + key, _output = worker(*_args, **_kwargs) except Exception as exc: self._context.log_error('threaded_fetch error: |{exc}|' .format(exc=exc)) continue - if _output and output_lock.acquire(blocking=True): - output.extend(_output) - output_lock.release() + if _output: + output[key] = _output input_lock.release() payload = { 'args': list(set(sub_channel_ids)), 'kwargs': None, - 'output': [], - 'worker': _feed_items, + 'output': {}, + 'worker': _get_feed, 'input_lock': threading.Lock(), - 'output_lock': threading.Lock(), } threads = [] num_threads = 0 max_threads = min(32, (cpu_count() or 1) + 4) - while (payload['args'] - or payload['kwargs'] - or payload['output_lock'].locked()): + while payload['args'] or payload['kwargs']: if num_threads >= max_threads: continue thread = threading.Thread( @@ -1612,7 +1574,35 @@ def _threaded_fetch(args, for thread in threads: thread.join(30) - items = payload['output'] + namespaces = { + 'atom': 'http://www.w3.org/2005/Atom', + 'yt': 'http://www.youtube.com/xml/schemas/2015', + 'media': 'http://search.yahoo.com/mrss/', + } + encode = not current_system_version.compatible(19, 0) + items = [] + for channel_id, feed in payload['output'].items(): + feed.encoding = 'utf-8' + feed = to_unicode(feed.content).replace('\n', '') + + root = ET.fromstring(to_str(feed) if encode else feed) + channel_name = (root.findtext('atom:title', '', namespaces) + .lower().replace(',', '')) + items.extend([{ + 'kind': 'youtube#video', + 'id': item.findtext('yt:videoId', '', namespaces), + 'snippet': { + 'title': item.findtext('atom:title', '', namespaces), + 'channelId': channel_id, + }, + '_channel': channel_name, + '_timestamp': datetime_parser.since_epoch( + datetime_parser.strptime( + item.findtext('atom:published', '', namespaces) + ) + ), + '_partial': True, + } for item in root.findall('atom:entry', namespaces)]) # Update cache cache.set_item(cache_items_key, items)