Skip to content

Commit

Permalink
[extractor/twitter] Fix GraphQL and legacy API (yt-dlp#7516)
Browse files Browse the repository at this point in the history
Authored by: bashonly
  • Loading branch information
bashonly authored Jul 6, 2023
1 parent b03fa78 commit 92315c0
Showing 1 changed file with 124 additions and 67 deletions.
191 changes: 124 additions & 67 deletions yt_dlp/extractor/twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..utils import (
ExtractorError,
dict_get,
filter_dict,
float_or_none,
format_field,
int_or_none,
Expand All @@ -33,8 +34,8 @@ class TwitterBaseIE(InfoExtractor):
_API_BASE = 'https://api.twitter.com/1.1/'
_GRAPHQL_API_BASE = 'https://twitter.com/i/api/graphql/'
_BASE_REGEX = r'https?://(?:(?:www|m(?:obile)?)\.)?(?:twitter\.com|twitter3e4tixl4xyajtrzo62zg5vztmjuricljdp2c5kshju4avyoid\.onion)/'
_AUTH = {'Authorization': 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'}
_guest_token = None
_AUTH = 'AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
_LEGACY_AUTH = 'AAAAAAAAAAAAAAAAAAAAAIK1zgAAAAAA2tUWuhGZ2JceoId5GwYWU5GspY4%3DUq7gzFoCZs1QfwGoVdvSac3IniczZEYXIcDyumCauIXpcAPorE'
_flow_token = None

_LOGIN_INIT_DATA = json.dumps({
Expand Down Expand Up @@ -145,20 +146,21 @@ def _search_dimensions_in_video_url(a_format, video_url):
def is_logged_in(self):
return bool(self._get_cookies(self._API_BASE).get('auth_token'))

def _fetch_guest_token(self, headers, display_id):
headers.pop('x-guest-token', None)
self._guest_token = traverse_obj(self._download_json(
f'{self._API_BASE}guest/activate.json', display_id,
'Downloading guest token', data=b'', headers=headers), 'guest_token')
if not self._guest_token:
def _fetch_guest_token(self, display_id):
guest_token = traverse_obj(self._download_json(
f'{self._API_BASE}guest/activate.json', display_id, 'Downloading guest token', data=b'',
headers=self._set_base_headers(legacy=display_id and self._configuration_arg('legacy_api'))),
('guest_token', {str}))
if not guest_token:
raise ExtractorError('Could not retrieve guest token')
return guest_token

def _set_base_headers(self):
headers = self._AUTH.copy()
csrf_token = try_call(lambda: self._get_cookies(self._API_BASE)['ct0'].value)
if csrf_token:
headers['x-csrf-token'] = csrf_token
return headers
def _set_base_headers(self, legacy=False):
bearer_token = self._LEGACY_AUTH if legacy and not self.is_logged_in else self._AUTH
return filter_dict({
'Authorization': f'Bearer {bearer_token}',
'x-csrf-token': try_call(lambda: self._get_cookies(self._API_BASE)['ct0'].value),
})

def _call_login_api(self, note, headers, query={}, data=None):
response = self._download_json(
Expand All @@ -183,17 +185,18 @@ def _perform_login(self, username, password):
if self.is_logged_in:
return

self._request_webpage('https://twitter.com/', None, 'Requesting cookies')
headers = self._set_base_headers()
self._fetch_guest_token(headers, None)
headers.update({
webpage = self._download_webpage('https://twitter.com/', None, 'Downloading login page')
guest_token = self._search_regex(
r'\.cookie\s*=\s*["\']gt=(\d+);', webpage, 'gt', default=None) or self._fetch_guest_token(None)
headers = {
**self._set_base_headers(),
'content-type': 'application/json',
'x-guest-token': self._guest_token,
'x-guest-token': guest_token,
'x-twitter-client-language': 'en',
'x-twitter-active-user': 'yes',
'Referer': 'https://twitter.com/',
'Origin': 'https://twitter.com',
})
}

def build_login_json(*subtask_inputs):
return json.dumps({
Expand Down Expand Up @@ -285,37 +288,26 @@ def input_dict(subtask_id, text):
self.report_login()

def _call_api(self, path, video_id, query={}, graphql=False):
headers = self._set_base_headers()
if self.is_logged_in:
headers.update({
'x-twitter-auth-type': 'OAuth2Session',
'x-twitter-client-language': 'en',
'x-twitter-active-user': 'yes',
})

for first_attempt in (True, False):
if not self.is_logged_in:
if not self._guest_token:
self._fetch_guest_token(headers, video_id)
headers['x-guest-token'] = self._guest_token

allowed_status = {400, 401, 403, 404} if graphql else {403}
result = self._download_json(
(self._GRAPHQL_API_BASE if graphql else self._API_BASE) + path,
video_id, headers=headers, query=query, expected_status=allowed_status,
note=f'Downloading {"GraphQL" if graphql else "legacy API"} JSON')

if result.get('errors'):
errors = ', '.join(set(traverse_obj(result, ('errors', ..., 'message', {str}))))
if not self.is_logged_in and first_attempt and 'bad guest token' in errors.lower():
self.to_screen('Guest token has expired. Refreshing guest token')
self._guest_token = None
continue
headers = self._set_base_headers(legacy=not graphql and self._configuration_arg('legacy_api'))
headers.update({
'x-twitter-auth-type': 'OAuth2Session',
'x-twitter-client-language': 'en',
'x-twitter-active-user': 'yes',
} if self.is_logged_in else {
'x-guest-token': self._fetch_guest_token(video_id)
})
allowed_status = {400, 401, 403, 404} if graphql else {403}
result = self._download_json(
(self._GRAPHQL_API_BASE if graphql else self._API_BASE) + path,
video_id, headers=headers, query=query, expected_status=allowed_status,
note=f'Downloading {"GraphQL" if graphql else "legacy API"} JSON')

raise ExtractorError(
f'Error(s) while querying API: {errors or "Unknown error"}', expected=True)
if result.get('errors'):
errors = ', '.join(set(traverse_obj(result, ('errors', ..., 'message', {str}))))
raise ExtractorError(
f'Error(s) while querying API: {errors or "Unknown error"}', expected=True)

return result
return result

def _build_graphql_query(self, media_id):
raise NotImplementedError('Method must be implemented to support GraphQL')
Expand Down Expand Up @@ -765,9 +757,9 @@ class TwitterIE(TwitterBaseIE):
'url': 'https://twitter.com/UltimaShadowX/status/1577719286659006464',
'info_dict': {
'id': '1577719286659006464',
'title': 'Ultima | #\u0432\u029f\u043c - Test',
'title': 'Ultima📛 | #вʟм - Test',
'description': 'Test https://t.co/Y3KEZD7Dad',
'uploader': 'Ultima | #\u0432\u029f\u043c',
'uploader': 'Ultima📛 | #вʟм',
'uploader_id': 'UltimaShadowX',
'uploader_url': 'https://twitter.com/UltimaShadowX',
'upload_date': '20221005',
Expand Down Expand Up @@ -825,6 +817,7 @@ class TwitterIE(TwitterBaseIE):
},
'skip': 'Requires authentication',
}, {
# Playlist result only with auth
'url': 'https://twitter.com/Srirachachau/status/1395079556562706435',
'playlist_mincount': 2,
'info_dict': {
Expand Down Expand Up @@ -896,6 +889,7 @@ class TwitterIE(TwitterBaseIE):
},
'add_ie': ['TwitterSpaces'],
'params': {'skip_download': 'm3u8'},
'skip': 'Requires authentication',
}, {
# URL specifies video number but --yes-playlist
'url': 'https://twitter.com/CTVJLaidlaw/status/1600649710662213632/video/1',
Expand Down Expand Up @@ -1009,14 +1003,14 @@ class TwitterIE(TwitterBaseIE):
'timestamp': 1670306984.0,
},
}, {
# url to retweet id, legacy API
# url to retweet id w/ legacy api
'url': 'https://twitter.com/liberdalau/status/1623739803874349067',
'info_dict': {
'id': '1623274794488659969',
'display_id': '1623739803874349067',
'ext': 'mp4',
'title': 'Johnny Bullets - Me after going viral to over 30million people: Whoopsie-daisy',
'description': 'md5:e873616a4a8fe0f93e71872678a672f3',
'description': 'md5:b06864cd3dc2554821cc327f5348485a',
'uploader': 'Johnny Bullets',
'uploader_id': 'Johnnybull3ts',
'uploader_url': 'https://twitter.com/Johnnybull3ts',
Expand All @@ -1028,9 +1022,31 @@ class TwitterIE(TwitterBaseIE):
'thumbnail': r're:https://pbs\.twimg\.com/ext_tw_video_thumb/.+',
'like_count': int,
'repost_count': int,
'comment_count': int,
},
'params': {'extractor_args': {'twitter': {'legacy_api': ['']}}},
}, {
# orig tweet w/ graphql
'url': 'https://twitter.com/liberdalau/status/1623739803874349067',
'info_dict': {
'id': '1623274794488659969',
'display_id': '1623739803874349067',
'ext': 'mp4',
'title': '@[email protected] 🐀 - RT @Johnnybull3ts: Me after going viral to over 30million people: Whoopsie-daisy',
'description': 'md5:9258bdbb54793bdc124fe1cd47e96c6a',
'uploader': '@[email protected] 🐀',
'uploader_id': 'liberdalau',
'uploader_url': 'https://twitter.com/liberdalau',
'age_limit': 0,
'tags': [],
'duration': 8.033,
'timestamp': 1675964711.0,
'upload_date': '20230209',
'thumbnail': r're:https://pbs\.twimg\.com/ext_tw_video_thumb/.+',
'like_count': int,
'view_count': int,
'repost_count': int,
'comment_count': int,
},
}, {
# onion route
'url': 'https://twitter3e4tixl4xyajtrzo62zg5vztmjuricljdp2c5kshju4avyoid.onion/TwitterBlue/status/1484226494708662273',
Expand Down Expand Up @@ -1073,17 +1089,21 @@ def _graphql_to_legacy(self, data, twid):
result = traverse_obj(data, (
'threaded_conversation_with_injections_v2', 'instructions', 0, 'entries',
lambda _, v: v['entryId'] == f'tweet-{twid}', 'content', 'itemContent',
'tweet_results', 'result', ('tweet', None),
), expected_type=dict, default={}, get_all=False)
'tweet_results', 'result', ('tweet', None), {dict},
), default={}, get_all=False) if self.is_logged_in else traverse_obj(
data, ('tweetResult', 'result', {dict}), default={})

if result.get('__typename') not in ('Tweet', 'TweetTombstone', None):
if result.get('__typename') not in ('Tweet', 'TweetTombstone', 'TweetUnavailable', None):
self.report_warning(f'Unknown typename: {result.get("__typename")}', twid, only_once=True)

if 'tombstone' in result:
cause = remove_end(traverse_obj(result, ('tombstone', 'text', 'text', {str})), '. Learn more')
if cause and 'adult content' in cause:
self.raise_login_required(cause)
raise ExtractorError(f'Twitter API says: {cause or "Unknown error"}', expected=True)
elif result.get('__typename') == 'TweetUnavailable':
reason = result.get('reason')
if reason == 'NsfwLoggedOut':
self.raise_login_required('NSFW tweet requires authentication')
raise ExtractorError(reason or 'Requested tweet is unavailable', expected=True)

status = result.get('legacy', {})
status.update(traverse_obj(result, {
Expand Down Expand Up @@ -1134,23 +1154,58 @@ def _build_graphql_query(self, media_id):
'verified_phone_label_enabled': False,
'vibe_api_enabled': True,
},
} if self.is_logged_in else {
'variables': {
'tweetId': media_id,
'withCommunity': False,
'includePromotedContent': False,
'withVoice': False,
},
'features': {
'creator_subscriptions_tweet_preview_api_enabled': True,
'tweetypie_unmention_optimization_enabled': True,
'responsive_web_edit_tweet_api_enabled': True,
'graphql_is_translatable_rweb_tweet_is_translatable_enabled': True,
'view_counts_everywhere_api_enabled': True,
'longform_notetweets_consumption_enabled': True,
'responsive_web_twitter_article_tweet_consumption_enabled': False,
'tweet_awards_web_tipping_enabled': False,
'freedom_of_speech_not_reach_fetch_enabled': True,
'standardized_nudges_misinfo': True,
'tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled': True,
'longform_notetweets_rich_text_read_enabled': True,
'longform_notetweets_inline_media_enabled': True,
'responsive_web_graphql_exclude_directive_enabled': True,
'verified_phone_label_enabled': False,
'responsive_web_media_download_video_enabled': False,
'responsive_web_graphql_skip_user_profile_image_extensions_enabled': False,
'responsive_web_graphql_timeline_navigation_enabled': True,
'responsive_web_enhance_cards_enabled': False
},
'fieldToggles': {
'withArticleRichContentState': False
}
}

def _real_extract(self, url):
twid, selected_index = self._match_valid_url(url).group('id', 'index')
if self._configuration_arg('legacy_api') and not self.is_logged_in:
if not self.is_logged_in and self._configuration_arg('legacy_api'):
status = traverse_obj(self._call_api(f'statuses/show/{twid}.json', twid, {
'cards_platform': 'Web-12',
'include_cards': 1,
'include_reply_count': 1,
'include_user_entities': 0,
'tweet_mode': 'extended',
}), 'retweeted_status', None)
elif not self.is_logged_in:
status = self._graphql_to_legacy(
self._call_graphql_api('2ICDjqPd81tulZcYrtpTuQ/TweetResultByRestId', twid), twid)
else:
result = self._call_graphql_api('zZXycP0V6H7m-2r0mOnFcA/TweetDetail', twid)
status = self._graphql_to_legacy(result, twid)
status = self._graphql_to_legacy(
self._call_graphql_api('zZXycP0V6H7m-2r0mOnFcA/TweetDetail', twid), twid)

title = description = status['full_text'].replace('\n', ' ')
title = description = traverse_obj(
status, (('full_text', 'text'), {lambda x: x.replace('\n', ' ')}), get_all=False) or ''
# strip 'https -_t.co_BJYgOjSeGA' junk from filenames
title = re.sub(r'\s+(https?://[^ ]+)', '', title)
user = status.get('user') or {}
Expand All @@ -1177,11 +1232,10 @@ def _real_extract(self, url):
def extract_from_video_info(media):
media_id = traverse_obj(media, 'id_str', 'id', expected_type=str_or_none)
self.write_debug(f'Extracting from video info: {media_id}')
video_info = media.get('video_info') or {}

formats = []
subtitles = {}
for variant in video_info.get('variants', []):
for variant in traverse_obj(media, ('video_info', 'variants', ...)):
fmts, subs = self._extract_variant_formats(variant, twid)
subtitles = self._merge_subtitles(subtitles, subs)
formats.extend(fmts)
Expand All @@ -1206,7 +1260,7 @@ def add_thumbnail(name, size):
'subtitles': subtitles,
'thumbnails': thumbnails,
'view_count': traverse_obj(media, ('mediaStats', 'viewCount', {int_or_none})),
'duration': float_or_none(video_info.get('duration_millis'), 1000),
'duration': float_or_none(traverse_obj(media, ('video_info', 'duration_millis')), 1000),
# The codec of http formats are unknown
'_format_sort_fields': ('res', 'br', 'size', 'proto'),
}
Expand Down Expand Up @@ -1291,7 +1345,8 @@ def get_binding_value(k):
if self._yes_playlist(twid, selected_index, video_label='URL-specified video number'):
selected_entries = (*map(extract_from_video_info, videos), *extract_from_card_info(status.get('card')))
else:
desired_obj = traverse_obj(status, ('extended_entities', 'media', int(selected_index) - 1, {dict}))
desired_obj = traverse_obj(status, (
(None, 'quoted_status'), 'extended_entities', 'media', int(selected_index) - 1, {dict}), get_all=False)
if not desired_obj:
raise ExtractorError(f'Video #{selected_index} is unavailable', expected=True)
elif desired_obj.get('type') != 'video':
Expand Down Expand Up @@ -1481,6 +1536,8 @@ def _build_graphql_query(self, space_id):

def _real_extract(self, url):
space_id = self._match_id(url)
if not self.is_logged_in:
self.raise_login_required('Twitter Spaces require authentication')
space_data = self._call_graphql_api('HPEisOmj1epUNLCWTYhUWw/AudioSpaceById', space_id)['audioSpace']
if not space_data:
raise ExtractorError('Twitter Space not found', expected=True)
Expand Down

0 comments on commit 92315c0

Please sign in to comment.