Skip to content

Commit

Permalink
chore(core): unify styling in search functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Jordan Shatford <[email protected]>
  • Loading branch information
jordanshatford committed Oct 13, 2023
1 parent 110b890 commit bbb22b0
Showing 1 changed file with 44 additions and 44 deletions.
88 changes: 44 additions & 44 deletions core/ydcore/search/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,50 @@ class SearchMode:

class SearchCore:
response: Any = None
responseSource: Any = None
resultComponents: list[Any] = []
response_source: Any = None
result_components: list[Any] = []

def __init__(
self, query: str,
language: str, region: str,
searchPreferences: str, timeout: int | None,
search_mode: str, timeout: int | None,
):
self.query = query
self.language = language
self.region = region
self.searchPreferences = searchPreferences
self.search_mode = search_mode
self.timeout = timeout
self.continuationKey: str | None = None
self.continuation_key: str | None = None

def sync_create(self):
self._makeRequest()
self._parseSource()
self._make_request()
self._parse_source()

def result(self) -> dict[str, Any]:
'''Returns the search result.'''
return {'result': self.resultComponents}
return {'result': self.result_components}

def _next(self) -> bool:
if self.continuationKey:
if self.continuation_key:
self.response = None
self.responseSource = None
self.resultComponents: list[Any] = []
self._makeRequest()
self._parseSource()
self._getComponents()
self.response_source = None
self.result_components: list[Any] = []
self._make_request()
self._parse_source()
self._get_components()
return True
else:
return False

def _getComponents(self) -> None:
self.resultComponents = []
for element in self.responseSource:
def _get_components(self) -> None:
self.result_components = []
for element in self.response_source:
if 'videoRenderer' in element.keys():
video = element['videoRenderer']
self.resultComponents.append(self._getVideoComponent(video))
self.result_components.append(self._get_video_component(video))

def _makeRequest(self) -> None:
requestBody: dict[str, Any] = {
def _make_request(self) -> None:
request_body: dict[str, Any] = {
'context': {
'client': {
'clientName': 'WEB',
Expand All @@ -74,24 +74,24 @@ def _makeRequest(self) -> None:
},
},
}
requestBody['query'] = self.query
requestBody['context']['client'].update({
request_body['query'] = self.query
request_body['context']['client'].update({
'hl': self.language,
'gl': self.region,
})
if self.searchPreferences:
requestBody['params'] = self.searchPreferences
if self.continuationKey:
requestBody['continuation'] = self.continuationKey
requestBodyBytes = json.dumps(requestBody).encode('utf-8')
if self.search_mode:
request_body['params'] = self.search_mode
if self.continuation_key:
request_body['continuation'] = self.continuation_key
request_body_bytes = json.dumps(request_body).encode('utf-8')
request = Request(
'https://www.youtube.com/youtubei/v1/search' + '?' + urlencode({
'key': API_KEY,
}),
data=requestBodyBytes,
data=request_body_bytes,
headers={
'Content-Type': 'application/json; charset=utf-8',
'Content-Length': str(len(requestBodyBytes)),
'Content-Length': str(len(request_body_bytes)),
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36', # noqa: E501
},
)
Expand All @@ -102,17 +102,17 @@ def _makeRequest(self) -> None:
except Exception:
raise Exception('ERROR: Could not make request.')

def _parseSource(self) -> None:
def _parse_source(self) -> None:
try:
if not self.continuationKey:
responseContent = self._getValue(
if not self.continuation_key:
response_content = self._getValue(
json.loads(self.response), [
'contents', 'twoColumnSearchResultsRenderer',
'primaryContents', 'sectionListRenderer', 'contents',
],
)
else:
responseContent = self._getValue(
response_content = self._getValue(
json.loads(
self.response,
), [
Expand All @@ -121,38 +121,38 @@ def _parseSource(self) -> None:
'continuationItems',
],
)
if responseContent:
for element in responseContent:
if response_content:
for element in response_content:
if 'itemSectionRenderer' in element.keys():
self.responseSource = self._getValue(
self.response_source = self._getValue(
element, ['itemSectionRenderer', 'contents'],
)
if 'continuationItemRenderer' in element.keys():
self._getContinuation(element)
self._get_continuation(element)
else:
self.responseSource = self._getValue(
self.response_source = self._getValue(
json.loads(
self.response,
), [
'contents', 'twoColumnSearchResultsRenderer',
'primaryContents', 'richGridRenderer', 'contents',
],
)
if self.responseSource:
self._getContinuation(self.responseSource[-1])
if self.response_source:
self._get_continuation(self.response_source[-1])
except Exception:
raise Exception('ERROR: Could not parse YouTube response.')

def _getContinuation(self, element: dict[Any, Any]) -> None:
def _get_continuation(self, element: dict[Any, Any]) -> None:
temp = self._getValue(
element, [
'continuationItemRenderer',
'continuationEndpoint', 'continuationCommand', 'token',
],
)
self.continuationKey = str(temp) if temp is not None else temp
self.continuation_key = str(temp) if temp is not None else temp

def _getVideoComponent(self, video: dict[Any, Any]) -> dict[Any, Any]:
def _get_video_component(self, video: dict[Any, Any]) -> dict[Any, Any]:
component: dict[Any, Any] = {
'type': 'video',
'id': self._getValue(video, ['videoId']),
Expand Down Expand Up @@ -245,7 +245,7 @@ def __init__(
SearchMode.videos, timeout,
)
self.sync_create()
self._getComponents()
self._get_components()

def next(self) -> bool:
return self._next()

1 comment on commit bbb22b0

@vercel
Copy link

@vercel vercel bot commented on bbb22b0 Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.