Skip to content

Commit

Permalink
Fix downloading Kakao stickers from share link
Browse files Browse the repository at this point in the history
  • Loading branch information
laggykiller committed Jul 9, 2024
1 parent e603367 commit 8dc46a7
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 142 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ anyio~=4.4.0
apngasm_python~=1.3.1
av~=12.2.0
beautifulsoup4~=4.12.3
chromedriver_autoinstaller~=0.6.4
rookiepy~=0.5.2
imagequant~=1.1.1
memory-tempfile~=2.2.3
Expand Down
4 changes: 2 additions & 2 deletions src/sticker_convert/downloaders/download_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ def download_file(
response = requests.get(
url, stream=True, allow_redirects=True, **kwargs
)
total_length = int(response.headers.get("content-length")) # type: ignore

if not response.ok:
return b""
total_length = int(response.headers.get("content-length")) # type: ignore

self.cb.put(f"Downloading {url}")

if show_progress:
Expand Down
232 changes: 92 additions & 140 deletions src/sticker_convert/downloaders/download_kakao.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,111 +3,59 @@

import itertools
import json
import re
import zipfile
from io import BytesIO
import webbrowser
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, List, Optional, Tuple, cast
from urllib.parse import urlparse

import chromedriver_autoinstaller # type: ignore
import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait

from sticker_convert.downloaders.download_base import DownloadBase
from sticker_convert.job_option import CredOption
from sticker_convert.utils.callback import CallbackProtocol, CallbackReturn
from sticker_convert.utils.files.metadata_handler import MetadataHandler
from sticker_convert.utils.media.decrypt_kakao import DecryptKakao


def search_bracket(text: str, open_bracket: str = "{", close_bracket: str = "}") -> int:
depth = 0
is_str = False

for count, char in enumerate(text):
if char == '"':
is_str = not is_str

if is_str is False:
if char == open_bracket:
depth += 1
elif char == close_bracket:
depth -= 1

if depth == 0:
return count

return -1
JSINJECT = """
class osclass {
android = true;
}
class uaclass {
os = new osclass();
}
class util {
static userAgent() {
return new uaclass();
}
}
class daumtools {
static web2app(dataDict) {
return dataDict['urlScheme'];
}
}
"""

HTMLPAGE = """
<html>
<body>
<p id="p1"></p>
<script>
{}
</script>
</body>
</html>
"""


class MetadataKakao:
@staticmethod
def get_info_from_share_link(url: str) -> Tuple[Optional[str], Optional[str]]:
headers = {"User-Agent": "Android"}

response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.content.decode("utf-8", "ignore"), "html.parser")

pack_title_tag = soup.find("title") # type: ignore
if not pack_title_tag:
return None, None

pack_title: str = pack_title_tag.string # type: ignore

app_scheme_link_tag = soup.find("a", id="app_scheme_link") # type: ignore
assert isinstance(app_scheme_link_tag, Tag)

item_code_fake = cast(str, app_scheme_link_tag["data-i"])

js = ""
for script_tag in soup.find_all("script"):
js = script_tag.string
if js and "emoticonDeepLink" in js:
break
if "emoticonDeepLink" not in js:
return None, None

func_start_pos = js.find("function emoticonDeepLink(")
js = js[func_start_pos:]
bracket_start_pos = js.find("{")
func_end_pos = search_bracket(js[bracket_start_pos:]) + bracket_start_pos
js = js[bracket_start_pos + 1 : func_end_pos]
js = js.split(";")[0]

minus_num_regex = re.search(r"\-(.*?)\^", js)
if not minus_num_regex:
return None, None
minus_num_str = minus_num_regex.group(1)
if not minus_num_str.isnumeric():
return None, None
minus_num = int(minus_num_str)

xor_num_regex = re.search(r"\^(.*?)\)", js)
if not xor_num_regex:
return None, None
xor_num_str = xor_num_regex.group(1)
if not xor_num_str.isnumeric():
return None, None
xor_num = int(xor_num_str)

item_code = str(int(item_code_fake) - minus_num ^ xor_num)

# https://github.com/Nuitka/Nuitka/issues/385
# js2py not working if compiled by nuitka
# web2app_start_pos = js.find("daumtools.web2app(")
# js = js[:web2app_start_pos] + "return a;}"
# get_item_code = js2py.eval_js(js) # type: ignore
# kakao_scheme_link = cast(
# str,
# get_item_code(
# "kakaotalk://store/emoticon/${i}?referer=share_link", item_code_fake
# ),
# )
# item_code = urlparse(kakao_scheme_link).path.split("/")[-1]

return pack_title, item_code

@staticmethod
def get_item_code(title_ko: str, auth_token: str) -> Optional[str]:
headers = {
Expand Down Expand Up @@ -177,15 +125,67 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.pack_info_unauthed: Optional[dict[str, Any]] = None
self.pack_info_authed: Optional[dict[str, Any]] = None

def get_info_from_share_link(self, url: str) -> Tuple[Optional[str], Optional[str]]:
headers = {"User-Agent": "Android"}

response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.content.decode("utf-8", "ignore"), "html.parser")

pack_title_tag = soup.find("title") # type: ignore
if not pack_title_tag:
return None, None

pack_title: str = pack_title_tag.string # type: ignore

js = ""
for script_tag in soup.find_all("script"):
js = script_tag.string
if js and "daumtools.web2app" in js:
break
if "daumtools.web2app" not in js:
return None, None

js = js.replace(
"daumtools.web2app",
'document.getElementById("p1").innerHTML = daumtools.web2app',
)
js = JSINJECT + js

with TemporaryDirectory() as tempdir:
html_page_path = Path(tempdir, "page.html")
html_page = HTMLPAGE.format(js)
with open(html_page_path, "w+") as f:
f.write(html_page)

try:
chromedriver_autoinstaller.install()
chrome_options = Options()
chrome_options.add_argument("--headless") # type: ignore
driver = webdriver.Chrome(options=chrome_options)
driver.get(html_page_path.as_posix())
wait = WebDriverWait(driver, 10)
wait.until(EC.text_to_be_present_in_element((By.ID, "p1"), "kakaotalk")) # type: ignore
kakao_url_elm = driver.find_element(By.ID, "p1")
kakao_url = cast(str, kakao_url_elm.text) # type: ignore
driver.close()
except ValueError:
webbrowser.open(html_page_path.as_posix())
prompt = "Chrome not installed, using manual method.\n"
prompt += "Please copy and paste the url you see in the browser"
self.cb.put(("ask_str", (prompt,), None))
kakao_url = cast(str, self.cb_return.get_response())

item_code = urlparse(kakao_url).path.split("/")[2]

return pack_title, item_code

def download_stickers_kakao(self) -> bool:
self.auth_token = None
if self.opt_cred:
self.auth_token = self.opt_cred.kakao_auth_token

if urlparse(self.url).netloc == "emoticon.kakao.com":
self.pack_title, item_code = MetadataKakao.get_info_from_share_link(
self.url
)
self.pack_title, item_code = self.get_info_from_share_link(self.url)

if item_code:
return self.download_animated(item_code)
Expand Down Expand Up @@ -268,14 +268,6 @@ def download_animated(self, item_code: str) -> bool:
self.out_dir, title=self.pack_title, author=self.author
)

success = self.download_animated_zip(item_code)
if not success:
self.cb.put("Trying to download one by one")
success = self.download_animated_files(item_code)

return success

def download_animated_files(self, item_code: str) -> bool:
play_exts = [".webp", ".gif", ".png", ""]
play_types = ["emot", "emoji", ""] # emot = normal; emoji = mini
play_path_format = None
Expand Down Expand Up @@ -376,46 +368,6 @@ def download_animated_files(self, item_code: str) -> bool:

return True

def download_animated_zip(self, item_code: str) -> bool:
pack_url = f"http://item.kakaocdn.net/dw/{item_code}.file_pack.zip"

zip_file = self.download_file(pack_url)
if zip_file:
self.cb.put(f"Downloaded {pack_url}")
else:
self.cb.put(f"Cannot download {pack_url}")
return False

with zipfile.ZipFile(BytesIO(zip_file)) as zf:
self.cb.put("Unzipping...")
self.cb.put(
(
"bar",
None,
{"set_progress_mode": "determinate", "steps": len(zf.namelist())},
)
)

for num, f_path in enumerate(sorted(zf.namelist())):
ext = Path(f_path).suffix

if ext in (".gif", ".webp"):
data = DecryptKakao.xor_data(zf.read(f_path))
self.cb.put(f"Decrypted {f_path}")
else:
data = zf.read(f_path)
self.cb.put(f"Read {f_path}")

out_path = Path(self.out_dir, str(num).zfill(3) + ext)
with open(out_path, "wb") as f:
f.write(data)

self.cb.put("update_bar")

self.cb.put(f"Finished getting {pack_url}")

return True

@staticmethod
def start(
url: str,
Expand Down

0 comments on commit 8dc46a7

Please sign in to comment.