Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
3mora2 committed Jul 21, 2024
1 parent 336555f commit 56e8f17
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 48 deletions.
159 changes: 126 additions & 33 deletions PlaywrightSafeThread/browser/threadsafe_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,24 @@
import subprocess
import sys
import tempfile
from typing import Callable, Literal
from concurrent.futures import Future
from typing import (
Callable,
Awaitable,
Set,
Optional,
Literal,
)
import asyncio
import platform
from threading import Thread, Event

from threading import Thread, Event, Lock
from playwright.async_api import (
async_playwright,
Browser,
Page,
BrowserType
)
from playwright._impl._driver import compute_driver_executable, get_driver_env
from playwright.async_api import async_playwright, Page, Browser, BrowserType

sys_os = platform.system()

Expand All @@ -37,6 +48,7 @@ def __init__(
browser: BrowserName = "chromium",
stealthy: bool = False,
install: bool = False,
install_callback= None,
check_open_dir=True,
close_already_profile=True,
loop=None,
Expand Down Expand Up @@ -222,6 +234,7 @@ def __init__(
# from PlaywrightSafeThread._future_.threaded_child_watcher import ThreadedChildWatcher
# asyncio.set_child_watcher(ThreadedChildWatcher())

self.install_callback = install_callback
self._stealthy = stealthy
self._no_context = no_context
self._browser_name = browser
Expand Down Expand Up @@ -260,7 +273,14 @@ def __init__(

self.loop = asyncio.new_event_loop()
self.start_event = Event()
self.thread = Thread(target=self.__thread_worker, daemon=True)
# self.thread = Thread(target=self.__thread_worker, daemon=True)
self.thread = Thread(
name="Thread-browser-%i" % id(self), target=self.__thread_worker
)

# TODO::
self.running_futures: Set[Future] = set()
self.running_futures_lock = Lock()

# Starting loop thread
self.thread.start()
Expand All @@ -280,10 +300,17 @@ def __thread_worker(self):

self.loop.run_until_complete(self.__stop_playwright())

async def create_task(self, task):
async def create_task(self, task, *args, **kwargs):
if not asyncio.iscoroutine(task):
task = task(*args, **kwargs)

if self.is_same_loop:
# TODO :: use __handle_future
return await self.loop.create_task(task)
return asyncio.run_coroutine_threadsafe(task, self.loop).result()

future = asyncio.run_coroutine_threadsafe(task, self.loop)
return self.__handle_future(future)
# return asyncio.run_coroutine_threadsafe(task, self.loop).result()

@property
def is_same_loop(self):
Expand All @@ -294,14 +321,26 @@ def is_same_loop(self):
return True
raise e

def run_threadsafe(self, task, timeout_=120):
def run_threadsafe(self, task, *args, timeout_=120, **kwargs):
if not asyncio.iscoroutine(task):
task = task(*args, **kwargs)

if not self.is_same_loop:
future = asyncio.run_coroutine_threadsafe(
task, self.loop
)
result = future.result(timeout=timeout_)
return result
future = asyncio.run_coroutine_threadsafe(task, self.loop)
return self.__handle_future(future, timeout=timeout_)

# def _on_completion(f):
# exception = f.exception()
#
# if exception:
# raise exception
#
# future.add_done_callback(_on_completion)

# result = future.result(timeout=timeout_)
# return result

# TODO :: use __handle_future
start_event_task = Event()

async def run_task(task_):
Expand All @@ -312,6 +351,7 @@ async def run_task(task_):
future = self.loop.create_task(
run_task(task)
)

start_event_task.wait(timeout_)
result = future.result()
return result
Expand All @@ -326,6 +366,7 @@ async def __start_playwright(self) -> None:
elif self._browser_name == "webkit":
self.browser_type = self.playwright.webkit
elif self._no_context:
# Not need create context or page
pass
else:
raise TypeError("unsupported browser")
Expand All @@ -347,8 +388,24 @@ async def __start_playwright(self) -> None:

self.page = await self.first_page()

# def stop(self) -> None:
# self.loop.call_soon_threadsafe(self.loop.stop)

def stop(self) -> None:
# NOTE: if we don't do this and some job sent
# to a threadpool executor raises and triggers
# the closing of the playwright driver, then a
# catastrophic chain reaction of exception will
# make some jobs hang up and block indefinitely
# which will cause a deadlock.
# TODO::
with self.running_futures_lock:
for fut in self.running_futures:
if not fut.done():
fut.cancel()

self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()

async def __stop_playwright(self) -> None:
# NOTE: we need to make sure those were actually launched, in
Expand Down Expand Up @@ -384,25 +441,28 @@ async def __stop_playwright(self) -> None:
pass

def check_close_profile(self, path):
import psutil
for proc in psutil.process_iter():
name = proc.name()
if "chrome.exe" in name:
cmd = proc.cmdline()
user = list(filter(lambda x: "--user-data-dir" in x, cmd))
if user:
# path_old.add(p)
p = os.path.normpath(user[0].split("=")[-1])
if os.path.normpath(path) == p and self.__close_already_profile:
proc.terminate()

elif "firefox.exe" in name:
cmd = proc.cmdline()
user = list(filter(lambda x: "-profile" in x, cmd))
if user:
p = os.path.normpath(cmd[cmd.index(user[0]) + 1])
if os.path.normpath(path) == p and self.__close_already_profile:
proc.terminate()
try:
import psutil
for proc in psutil.process_iter():
name = proc.name()
if "chrome.exe" in name:
cmd = proc.cmdline()
user = list(filter(lambda x: "--user-data-dir" in x, cmd))
if user:
# path_old.add(p)
p = os.path.normpath(user[0].split("=")[-1])
if os.path.normpath(path) == p and self.__close_already_profile:
proc.terminate()

elif "firefox.exe" in name:
cmd = proc.cmdline()
user = list(filter(lambda x: "-profile" in x, cmd))
if user:
p = os.path.normpath(cmd[cmd.index(user[0]) + 1])
if os.path.normpath(path) == p and self.__close_already_profile:
proc.terminate()
except:
Logger.exception("check_close_profile")

def __enter__(self):
return self
Expand All @@ -414,7 +474,8 @@ def check_is_install(self, browser):
env = self.get_driver_env()
driver_executable, driver_cli = compute_driver_executable()

completed_process = subprocess.check_output([driver_executable, driver_cli, 'install', browser, '--dry-run'], env=env,
completed_process = subprocess.check_output([driver_executable, driver_cli, 'install', browser, '--dry-run'],
env=env,
**creation_flags_dict())

locale_ = ":".join(next(filter(lambda x: "Install location" in x,
Expand All @@ -440,6 +501,8 @@ def run_playwright(self, *args: str):
stderr=subprocess.STDOUT, **creation_flags_dict()) as process:
for line in process.stdout:
Logger.info(line.decode('utf-8'))
if self.install_callback:
self.install_callback(line.decode('utf-8'))

####################################################################################################################
async def first_page(self) -> "Page":
Expand Down Expand Up @@ -504,6 +567,36 @@ def sync_close(self, timeout_=60):
self.run_threadsafe(self.__stop_playwright(), timeout_=timeout_)
self.stop()

###########################
def __handle_future(self, future: Future, timeout=None):
with self.running_futures_lock:
self.running_futures.add(future)

try:
return future.result(timeout=timeout)
finally:
with self.running_futures_lock:
self.running_futures.remove(future)

def run_in_loop(self, task):
# it to run Any task in self.loop
future = asyncio.run_coroutine_threadsafe(task, self.loop)
return self.__handle_future(future)

async def to_do_with_callback_(self, task, callback: Optional[Callable[[Page], Awaitable[None]]] = None, ):
# TODO::
r = await task()
if callback:
callback(r)
return r

def to_do_with_callback(
self,
task,
callback: Optional[Callable[[Page], Awaitable[None]]] = None,
):
return self.run_threadsafe(self.to_do_with_callback_(task, callback=callback, ))


def creation_flags_dict():
try:
Expand Down
31 changes: 17 additions & 14 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@

Logger = logging.getLogger()

# th = ThreadsafeBrowser(browser="chromium")
# browser = th.run_threadsafe(th.browser_type.launch, channel="chrome", headless=False)
# context = th.run_threadsafe(browser.new_context, no_viewport=True, bypass_csp=True)
# page = th.run_threadsafe(context.new_page)
# th.run_threadsafe(page.goto, "https://web.whatsapp.com/", wait_until="networkidle")
# th.sync_close()
th = ThreadsafeBrowser(browser="chromium")
browser = th.run_threadsafe(th.browser_type.launch, channel="chrome", headless=False)
context = th.run_threadsafe(browser.new_context, no_viewport=True, bypass_csp=True)
page = th.run_threadsafe(context.new_page)
try:
th.run_threadsafe(page.goto, "https://web.whatsapp.com/", wait_until="networkidle")
finally:
th.sync_close()

# OR

th = ThreadsafeBrowser(
install=True,
no_context=False,
browser="chromium", channel="chrome", headless=False,
no_viewport=True, bypass_csp=True
)
th.run_threadsafe(th.page.goto, "https://web.whatsapp.com/", wait_until="networkidle")
th.sync_close()
# th = ThreadsafeBrowser(
# install=False,
# install_callback=print,
# no_context=False,
# browser="chromium", channel="chrome", headless=False,
# no_viewport=True, bypass_csp=True
# )
# th.run_threadsafe(th.page.goto, "https://web.whatsapp.com/")
# th.sync_close()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ playwright-stealth
psutil
pyee
typing_extensions

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
long_description = open("README.md", encoding="utf-8").read()
description = "PlaywrightSafeThread"

version = "0.5.4.1"
version = "0.5.5"

setup(
name="PlaywrightSafeThread",
Expand Down

0 comments on commit 56e8f17

Please sign in to comment.