From 2587aecf19de46b3f58b41e6039b2ed9f60099e7 Mon Sep 17 00:00:00 2001 From: TheraNinjaCat <236111633+TheraNinjaCat@users.noreply.github.com> Date: Thu, 17 Oct 2024 14:50:57 +1300 Subject: [PATCH] Added multiprocessing.Manager to solve multiprocessing.Queue issue on MacOS Changed format string in error messages from %e (float) to %s (string) in error logs. --- ntdissector/ntds/ntds.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ntdissector/ntds/ntds.py b/ntdissector/ntds/ntds.py index 615dae5..7a13e53 100644 --- a/ntdissector/ntds/ntds.py +++ b/ntdissector/ntds/ntds.py @@ -22,6 +22,8 @@ from json import loads from tqdm import tqdm import multiprocessing as mp +import threading +import queue from binascii import hexlify, unhexlify from base64 import b64encode from ntdissector.utils.crypto import PEK_LIST, format_asn1_to_pem @@ -708,7 +710,7 @@ def __translate(self, name: str) -> str: return name # multiprocessing with a queue - def __dumpObjectWorker(self, workersQ: mp.Queue, workerLock: mp.Lock) -> None: + def __dumpObjectWorker(self, workersQ: queue.Queue, workerLock: threading.Lock) -> None: wid = mp.current_process().name.split("-")[1] results = dict() logging.debug(f"Worker-{wid} started ") @@ -759,8 +761,9 @@ def printStats(stats: dict): def __reduce(record): return {"page_num": record._node.tag.page.num, "page_buf": bytes(record._node.tag.page.buf), "node_num": record._node.num} - workersQ = mp.Queue() - workerLock = mp.Lock() + mpManager = mp.Manager() + workersQ = mpManager.Queue() + workerLock = mpManager.Lock() pool = mp.get_context("fork").Pool(self.workers, self.__dumpObjectWorker, (workersQ, workerLock)) logging.info("Filtering records with this list of object classes : %s" % classList) if self.__skipDel: @@ -813,16 +816,13 @@ def __reduce(record): workersQ.put(None) try: - workersQ.close() - workersQ.join_thread() - logging.debug("Worker queue closed successfully") pool.close() pool.join() logging.debug("Process pool closed successfully") except BrokenPipeError as e: - logging.error("BrokenPipeError while closing pool - %e" % e) + logging.error("BrokenPipeError while closing pool - %s" % e) except Exception as e: - logging.error("Error while closing pool - %e" % e) + logging.error("Error while closing pool - %s" % e) raise e def getClasses(self) -> list: