Skip to content

Commit

Permalink
Improve typing; Fix random deadlocks esp when using scale_filter=near…
Browse files Browse the repository at this point in the history
…est; Add `box` and `hamming` scale_filter
  • Loading branch information
laggykiller committed Mar 19, 2024
1 parent 0f12a06 commit 282b3bb
Show file tree
Hide file tree
Showing 20 changed files with 83 additions and 112 deletions.
18 changes: 8 additions & 10 deletions src/sticker_convert/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,20 @@
from math import ceil, floor
from pathlib import Path
from queue import Queue
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union, cast

import numpy as np
from PIL import Image

from sticker_convert.job_option import CompOption
from sticker_convert.utils.callback import Callback, CallbackReturn
from sticker_convert.utils.callback import Callback, CallbackReturn, CbQueueItemType
from sticker_convert.utils.files.cache_store import CacheStore
from sticker_convert.utils.media.codec_info import CodecInfo
from sticker_convert.utils.media.format_verify import FormatVerify

if TYPE_CHECKING:
from av.video.plane import VideoPlane # type: ignore

CbQueueItemType = Union[
Tuple[str, Optional[Tuple[str]], Optional[Dict[str, str]]],
str,
None,
]

MSG_START_COMP = "[I] Start compressing {} -> {}"
MSG_SKIP_COMP = "[S] Compatible file found, skip compress and just copy {} -> {}"
MSG_COMP = (
Expand Down Expand Up @@ -89,7 +83,7 @@ def __init__(
opt_comp: CompOption,
cb: "Union[Queue[CbQueueItemType], Callback]",
# cb_return: CallbackReturn
):
) -> None:
self.in_f: Union[bytes, Path]
if isinstance(in_f, Path):
self.in_f = in_f
Expand Down Expand Up @@ -514,11 +508,15 @@ def frames_resize(
) -> "List[np.ndarray[Any, Any]]":
frames_out: "List[np.ndarray[Any, Any]]" = []

resample: Literal[0, 1, 2, 3]
resample: Literal[0, 1, 2, 3, 4, 5]
if self.opt_comp.scale_filter == "nearest":
resample = Image.NEAREST
elif self.opt_comp.scale_filter == "box":
resample = Image.BOX
elif self.opt_comp.scale_filter == "bilinear":
resample = Image.BILINEAR
elif self.opt_comp.scale_filter == "hamming":
resample = Image.HAMMING
elif self.opt_comp.scale_filter == "bicubic":
resample = Image.BICUBIC
elif self.opt_comp.scale_filter == "lanczos":
Expand Down
35 changes: 10 additions & 25 deletions src/sticker_convert/downloaders/download_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

from pathlib import Path
from queue import Queue
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, List, Optional, Tuple, Union

import requests

from sticker_convert.job_option import CredOption
from sticker_convert.utils.callback import Callback, CallbackReturn
from sticker_convert.utils.callback import Callback, CallbackReturn, CbQueueItemType


class DownloadBase:
Expand All @@ -18,35 +18,20 @@ def __init__(
out_dir: Path,
opt_cred: Optional[CredOption],
cb: Union[
Queue[
Union[
Tuple[str, Optional[Tuple[str]], Optional[Dict[str, Any]]],
str,
None,
]
],
Queue[CbQueueItemType],
Callback,
],
cb_return: CallbackReturn,
):
self.url: str = url
self.out_dir: Path = out_dir
self.opt_cred: Optional[CredOption] = opt_cred
self.cb: Union[
Queue[
Union[
Tuple[str, Optional[Tuple[str]], Optional[Dict[str, Any]]],
str,
None,
]
],
Callback,
] = cb
self.cb_return: CallbackReturn = cb_return
) -> None:
self.url = url
self.out_dir = out_dir
self.opt_cred = opt_cred
self.cb = cb
self.cb_return = cb_return

def download_multiple_files(
self, targets: List[Tuple[str, Path]], retries: int = 3, **kwargs: Any
):
) -> None:
# targets format: [(url1, dest2), (url2, dest2), ...]
self.cb.put(
("bar", None, {"set_progress_mode": "determinate", "steps": len(targets)})
Expand Down
3 changes: 1 addition & 2 deletions src/sticker_convert/downloaders/download_kakao.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
from bs4 import BeautifulSoup
from bs4.element import Tag

from sticker_convert.converter import CbQueueItemType
from sticker_convert.downloaders.download_base import DownloadBase
from sticker_convert.job_option import CredOption
from sticker_convert.utils.callback import Callback, CallbackReturn
from sticker_convert.utils.callback import Callback, CallbackReturn, CbQueueItemType
from sticker_convert.utils.files.metadata_handler import MetadataHandler
from sticker_convert.utils.media.decrypt_kakao import DecryptKakao

Expand Down
5 changes: 2 additions & 3 deletions src/sticker_convert/downloaders/download_line.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
from bs4 import BeautifulSoup
from PIL import Image

from sticker_convert.converter import CbQueueItemType
from sticker_convert.downloaders.download_base import DownloadBase
from sticker_convert.job_option import CredOption
from sticker_convert.utils.auth.get_line_auth import GetLineAuth
from sticker_convert.utils.callback import Callback, CallbackReturn
from sticker_convert.utils.callback import Callback, CallbackReturn, CbQueueItemType
from sticker_convert.utils.files.metadata_handler import MetadataHandler
from sticker_convert.utils.media.apple_png_normalize import ApplePngNormalize

Expand Down Expand Up @@ -227,7 +226,7 @@ def decompress(
num: int,
prefix: str = "",
suffix: str = "",
):
) -> None:
data = zf.read(f_path)
ext = Path(f_path).suffix
if ext == ".png" and not self.is_emoji and int() < 775:
Expand Down
3 changes: 1 addition & 2 deletions src/sticker_convert/downloaders/download_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
from signalstickers_client.errors import SignalException # type: ignore
from signalstickers_client.models import StickerPack # type: ignore

from sticker_convert.converter import CbQueueItemType
from sticker_convert.downloaders.download_base import DownloadBase
from sticker_convert.job_option import CredOption
from sticker_convert.utils.callback import Callback, CallbackReturn
from sticker_convert.utils.callback import Callback, CallbackReturn, CbQueueItemType
from sticker_convert.utils.files.metadata_handler import MetadataHandler
from sticker_convert.utils.media.codec_info import CodecInfo

Expand Down
3 changes: 1 addition & 2 deletions src/sticker_convert/downloaders/download_telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
from telegram import Bot
from telegram.error import TelegramError

from sticker_convert.converter import CbQueueItemType
from sticker_convert.downloaders.download_base import DownloadBase
from sticker_convert.job_option import CredOption
from sticker_convert.utils.callback import Callback, CallbackReturn
from sticker_convert.utils.callback import Callback, CallbackReturn, CbQueueItemType
from sticker_convert.utils.files.metadata_handler import MetadataHandler


Expand Down
2 changes: 1 addition & 1 deletion src/sticker_convert/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ def cb_bar(
steps: int = 0,
update_bar: bool = False,
**kwargs: Any,
):
) -> None:
with self.bar_lock:
self.progress_frame.update_progress_bar(
set_progress_mode, steps, update_bar, *args, **kwargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def update_progress_bar(
set_progress_mode: Optional[str] = None,
steps: int = 0,
update_bar: bool = False,
):
) -> None:
if update_bar and self.progress_bar_cli:
self.progress_bar_cli.update()
self.progress_bar["value"] += 100 / self.progress_bar_steps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,9 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.gui.scale_filter_var,
self.gui.scale_filter_var.get(),
"nearest",
"box",
"bilinear",
"hamming",
"bicubic",
"lanczos",
bootstyle="secondary", # type: ignore
Expand Down
79 changes: 32 additions & 47 deletions src/sticker_convert/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import traceback
from datetime import datetime
from multiprocessing import Process, Value
from multiprocessing.managers import SyncManager
from multiprocessing.managers import SyncManager, ListProxy
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import Any, Callable, Dict, Generator, Iterator, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Generator, Iterator, List, Optional, Tuple
from urllib.parse import urlparse

from sticker_convert.converter import StickerConvert
Expand All @@ -24,11 +24,13 @@
from sticker_convert.uploaders.upload_signal import UploadSignal
from sticker_convert.uploaders.upload_telegram import UploadTelegram
from sticker_convert.uploaders.xcode_imessage import XcodeImessage
from sticker_convert.utils.callback import CallbackReturn
from sticker_convert.utils.callback import CallbackReturn, CbQueueItemType
from sticker_convert.utils.files.json_resources_loader import OUTPUT_JSON
from sticker_convert.utils.files.metadata_handler import MetadataHandler
from sticker_convert.utils.media.codec_info import CodecInfo

CbQueueType = Queue[CbQueueItemType]
WorkListType = ListProxy[Optional[Tuple[Callable[..., Any], Tuple[Any, ...]]]]

class Executor:
def __init__(
Expand All @@ -38,7 +40,7 @@ def __init__(
cb_bar: Callable[..., None],
cb_ask_bool: Callable[..., bool],
cb_ask_str: Callable[..., str],
):
) -> None:
self.cb_msg = cb_msg
self.cb_msg_block = cb_msg_block
self.cb_bar = cb_bar
Expand All @@ -47,19 +49,11 @@ def __init__(

self.manager = SyncManager()
self.manager.start()
self.work_queue: Queue[Optional[Tuple[Callable[..., Any], Tuple[Any, ...]]]] = (
self.manager.Queue()
)
# Using list instead of queue for work_list as it can cause random deadlocks
# Especially when using scale_filter=nearest
self.work_list: WorkListType = self.manager.list()
self.results_queue: Queue[Any] = self.manager.Queue()
self.cb_queue: Queue[
Union[
Tuple[
Optional[str], Optional[Tuple[Any, ...]], Optional[Dict[str, str]]
],
str,
None,
]
] = self.manager.Queue()
self.cb_queue: CbQueueType = self.manager.Queue()
self.cb_return = CallbackReturn()
self.processes: List[Process] = []

Expand All @@ -76,16 +70,9 @@ def __init__(

def cb_thread(
self,
cb_queue: Queue[
Union[
Tuple[
Optional[str], Optional[Tuple[Any, ...]], Optional[Dict[str, str]]
],
str,
]
],
cb_queue: CbQueueType,
cb_return: CallbackReturn,
):
) -> None:
for i in iter(cb_queue.get, None):
if isinstance(i, tuple):
action = i[0]
Expand Down Expand Up @@ -118,19 +105,17 @@ def cb_thread(

@staticmethod
def worker(
work_queue: Queue[Optional[Tuple[Callable[..., Any], Tuple[Any, ...]]]],
work_list: WorkListType,
results_queue: Queue[Any],
cb_queue: Queue[
Union[
Tuple[
Optional[str], Optional[Tuple[Any, ...]], Optional[Dict[str, str]]
],
str,
]
],
cb_queue: CbQueueType,
cb_return: CallbackReturn,
):
for work_func, work_args in iter(work_queue.get, None):
) -> None:
while True:
work = work_list.pop(0)
if work is None:
break
else:
work_func, work_args = work
try:
results = work_func(*work_args, cb_queue, cb_return)
results_queue.put(results)
Expand All @@ -147,18 +132,17 @@ def worker(
e += traceback.format_exc()
e += "#####################"
cb_queue.put(e)
work_queue.put(None)
work_list.append(None)

def start_workers(self, processes: int = 1) -> None:
# Would contain None from previous run
while not self.work_queue.empty():
self.work_queue.get()
self.work_list = self.manager.list()

for _ in range(processes):
process = Process(
target=Executor.worker,
args=(
self.work_queue,
self.work_list,
self.results_queue,
self.cb_queue,
self.cb_return,
Expand All @@ -172,10 +156,10 @@ def start_workers(self, processes: int = 1) -> None:
def add_work(
self, work_func: Callable[..., Any], work_args: Tuple[Any, ...]
) -> None:
self.work_queue.put((work_func, work_args))
self.work_list.append((work_func, work_args))

def join_workers(self) -> None:
self.work_queue.put(None)
self.work_list.append(None)
try:
for process in self.processes:
process.join()
Expand All @@ -188,8 +172,7 @@ def join_workers(self) -> None:

def kill_workers(self, *_: Any, **__: Any) -> None:
self.is_cancel_job.value = 1 # type: ignore
while not self.work_queue.empty():
self.work_queue.get()
self.work_list = self.manager.list()

for process in self.processes:
if platform.system() == "Windows":
Expand All @@ -213,7 +196,7 @@ def cb(
action: Optional[str],
args: Optional[Tuple[str, ...]] = None,
kwargs: Optional[Dict[str, Any]] = None,
):
) -> None:
self.cb_queue.put((action, args, kwargs))


Expand All @@ -229,7 +212,7 @@ def __init__(
cb_bar: Callable[..., None],
cb_ask_bool: Callable[..., bool],
cb_ask_str: Callable[..., str],
):
) -> None:
self.opt_input = opt_input
self.opt_comp = opt_comp
self.opt_output = opt_output
Expand Down Expand Up @@ -402,15 +385,17 @@ def verify_input(self) -> bool:

if self.opt_comp.scale_filter not in (
"nearest",
"box",
"bilinear",
"hamming",
"bicubic",
"lanczos",
):
error_msg += "\n"
error_msg += (
f"[X] scale_filter {self.opt_comp.scale_filter} is not valid option"
)
error_msg += " Valid options: nearest, bilinear, bicubic, lanczos"
error_msg += " Valid options: nearest, box, bilinear, hamming, bicubic, lanczos"

if self.opt_comp.quantize_method not in ("imagequant", "fastoctree", "none"):
error_msg += "\n"
Expand Down
Loading

0 comments on commit 282b3bb

Please sign in to comment.