diff --git a/sfscan.py b/sfscan.py index 5b51906e9b..80212e47e5 100644 --- a/sfscan.py +++ b/sfscan.py @@ -12,8 +12,11 @@ import socket import sys import time +import queue import traceback +from time import sleep from copy import deepcopy +from collections import OrderedDict import dns.resolver @@ -180,6 +183,9 @@ def __init__(self, scanName, scanId, targetValue, targetType, moduleList, global self.__setStatus("INITIALIZING", time.time() * 1000, None) + # Used when module threading is enabled + self.eventQueue = None + if start: self.__startScan() @@ -222,14 +228,20 @@ def __setStatus(self, status, started=None, ended=None): self.__status = status self.__dbh.scanInstanceSet(self.__scanId, started, ended, status) - def __startScan(self): - """Start running a scan.""" + def __startScan(self, threaded=True): + """Start running a scan. + Args: + threaded (bool): whether to thread modules + """ aborted = False self.__setStatus("STARTING", time.time() * 1000, None) self.__sf.status(f"Scan [{self.__scanId}] initiated.") + if threaded: + self.eventQueue = queue.Queue() + try: # moduleList = list of modules the user wants to run for modName in self.__moduleList: @@ -276,22 +288,32 @@ def __startScan(self): if self.__config['__outputfilter']: mod.setOutputFilter(self.__config['__outputfilter']) + # Register the target with the module + mod.setTarget(self.__target) + + if threaded: + # Set up the outgoing event queue + mod.outgoingEventQueue = self.eventQueue + mod.incomingEventQueue = queue.Queue() + self.__sf.status(modName + " module loaded.") - # Register listener modules and then start all modules sequentially - for module in list(self.__moduleInstances.values()): - # Register the target with the module - module.setTarget(self.__target) - - for listenerModule in list(self.__moduleInstances.values()): - # Careful not to register twice or you will get duplicate events - if listenerModule in module._listenerModules: - continue - # Note the absence of a check for whether a module can register - # to itself. That is intentional because some modules will - # act on their own notifications (e.g. sfp_dns)! - if listenerModule.watchedEvents() is not None: - module.registerListener(listenerModule) + # sort modules by priority + self.__moduleInstances = OrderedDict(sorted(self.__moduleInstances.items(), key=lambda m: m[-1]._priority)) + + if not threaded: + # Register listener modules and then start all modules sequentially + for module in list(self.__moduleInstances.values()): + + for listenerModule in list(self.__moduleInstances.values()): + # Careful not to register twice or you will get duplicate events + if listenerModule in module._listenerModules: + continue + # Note the absence of a check for whether a module can register + # to itself. That is intentional because some modules will + # act on their own notifications (e.g. sfp_dns)! + if listenerModule.watchedEvents() is not None: + module.registerListener(listenerModule) # Now we are ready to roll.. self.__setStatus("RUNNING") @@ -302,9 +324,13 @@ def __startScan(self): psMod.setTarget(self.__target) psMod.setDbh(self.__dbh) psMod.clearListeners() - for mod in list(self.__moduleInstances.values()): - if mod.watchedEvents() is not None: - psMod.registerListener(mod) + if threaded: + psMod.outgoingEventQueue = self.eventQueue + psMod.incomingEventQueue = queue.Queue() + else: + for mod in list(self.__moduleInstances.values()): + if mod.watchedEvents() is not None: + psMod.registerListener(mod) # Create the "ROOT" event which un-triggered modules will link events to rootEvent = SpiderFootEvent("ROOT", self.__targetValue, "", None) @@ -326,12 +352,16 @@ def __startScan(self): # Check in case the user requested to stop the scan between modules # initializing - for module in list(self.__moduleInstances.values()): - if module.checkForStop(): + for mod in list(self.__moduleInstances.values()): + if mod.checkForStop(): self.__setStatus('ABORTING') aborted = True break + # start threads + if threaded and not aborted: + self.waitForThreads() + if aborted: self.__sf.status(f"Scan [{self.__scanId}] aborted.") self.__setStatus("ABORTED", None, time.time() * 1000) @@ -347,3 +377,86 @@ def __startScan(self): self.__setStatus("ERROR-FAILED", None, time.time() * 1000) self.__dbh.close() + + def waitForThreads(self): + counter = 0 + + try: + if not self.eventQueue: + return + + # start one thread for each module + for mod in self.__moduleInstances.values(): + mod.start() + + # watch for newly-generated events + while True: + + # log status of threads every 100 iterations + log_status = counter % 100 == 0 + counter += 1 + + try: + sfEvent = self.eventQueue.get_nowait() + self.__sf.debug(f"waitForThreads() got event, {sfEvent.eventType}, from eventQueue.") + except queue.Empty: + # check if we're finished + if self.threadsFinished(log_status): + sleep(.1) + # but are we really? + if self.threadsFinished(log_status): + break + else: + # save on CPU + sleep(.01) + continue + + if not isinstance(sfEvent, SpiderFootEvent): + raise TypeError(f"sfEvent is {type(sfEvent)}; expected SpiderFootEvent") + + # for every module + for mod in self.__moduleInstances.values(): + # if it's been aborted + if mod._stopScanning: + # break out of the while loop + raise AssertionError(f"{mod.__name__} requested stop") + + # send it the new event if applicable + watchedEvents = mod.watchedEvents() + if sfEvent.eventType in watchedEvents or "*" in watchedEvents: + mod.incomingEventQueue.put(deepcopy(sfEvent)) + + except (KeyboardInterrupt, AssertionError) as e: + self.__sf.status(f"Scan [{self.__scanId}] aborted, {e}.") + + finally: + # tell the modules to stop + for mod in self.__moduleInstances.values(): + mod._stopScanning = True + + def threadsFinished(self, log_status=False): + if self.eventQueue is None: + return True + + modules_waiting = {m.__name__: m.incomingEventQueue.qsize() for m in self.__moduleInstances.values()} + modules_waiting = sorted(modules_waiting.items(), key=lambda x: x[-1], reverse=True) + modules_running = [m.__name__ for m in self.__moduleInstances.values() if m.running] + queues_empty = [qsize == 0 for m, qsize in modules_waiting] + + if not modules_running and not queues_empty: + self.__sf.debug("Clearing queues for stalled/aborted modules.") + for mod in self.__moduleInstances.values(): + try: + while True: + mod.incomingEventQueue.get_nowait() + except Exception: + pass + + if log_status and modules_running: + events_queued = ", ".join([f"{mod}: {qsize:,}" for mod, qsize in modules_waiting[:5] if qsize > 0]) + if events_queued: + self.__sf.info(f"Events queued: {events_queued}") + + if all(queues_empty) and not modules_running: + return True + return False diff --git a/spiderfoot/plugin.py b/spiderfoot/plugin.py index 7d39daca4d..aac338ed7d 100644 --- a/spiderfoot/plugin.py +++ b/spiderfoot/plugin.py @@ -1,4 +1,7 @@ import logging +import threading +import queue +from time import sleep class SpiderFootPlugin(): @@ -47,10 +50,18 @@ class SpiderFootPlugin(): errorState = False # SOCKS proxy socksProxy = None + # Queue for incoming events + incomingEventQueue = None + # Queue for produced events + outgoingEventQueue = None def __init__(self): """Not really needed in most cases.""" - pass + + # Whether the module is currently executing + self.running = False + # Holds the thread object when module threading is enabled + self.thread = None def _updateSocket(self, socksProxy): """Hack to override module's use of socket, replacing it with @@ -194,6 +205,7 @@ def notifyListeners(self, sfEvent): Raises: TypeError: sfEvent argument was invalid type """ + from spiderfoot import SpiderFootEvent if not isinstance(sfEvent, SpiderFootEvent): @@ -239,26 +251,31 @@ def notifyListeners(self, sfEvent): break prevEvent = prevEvent.sourceEvent - self._listenerModules.sort(key=lambda m: m._priority) + # output to queue if applicable + if self.outgoingEventQueue is not None: + self.outgoingEventQueue.put(sfEvent) + # otherwise, call other modules directly + else: + self._listenerModules.sort(key=lambda m: m._priority) - for listener in self._listenerModules: - if eventName not in listener.watchedEvents() and '*' not in listener.watchedEvents(): - continue + for listener in self._listenerModules: + if eventName not in listener.watchedEvents() and '*' not in listener.watchedEvents(): + continue - if storeOnly and "__stor" not in listener.__module__: - continue + if storeOnly and "__stor" not in listener.__module__: + continue - listener._currentEvent = sfEvent + listener._currentEvent = sfEvent - # Check if we've been asked to stop in the meantime, so that - # notifications stop triggering module activity. - if self.checkForStop(): - return + # Check if we've been asked to stop in the meantime, so that + # notifications stop triggering module activity. + if self.checkForStop(): + return - try: - listener.handleEvent(sfEvent) - except Exception as e: - self.log.exception(f"Module ({listener.__module__}) encountered an error: {e}") + try: + listener.handleEvent(sfEvent) + except Exception as e: + self.log.exception(f"Module ({listener.__module__}) encountered an error: {e}") def checkForStop(self): """For modules to use to check for when they should give back control. @@ -266,18 +283,21 @@ def checkForStop(self): Returns: bool """ - if not self.__scanId__: - return False + if self.outgoingEventQueue and self.incomingEventQueue: + return self._stopScanning + else: + if not self.__scanId__: + return False - scanstatus = self.__sfdb__.scanInstanceGet(self.__scanId__) + scanstatus = self.__sfdb__.scanInstanceGet(self.__scanId__) - if not scanstatus: - return False + if not scanstatus: + return False - if scanstatus[5] == "ABORT-REQUESTED": - return True + if scanstatus[5] == "ABORT-REQUESTED": + return True - return False + return False def watchedEvents(self): """What events is this module interested in for input. The format is a list @@ -314,11 +334,38 @@ def handleEvent(self, sfEvent): return def start(self): - """Kick off the work (for some modules nothing will happen here, but instead - the work will start from the handleEvent() method. - Will usually be overriden by the implementer. - """ + self.thread = threading.Thread(target=self.threadWorker) + self.thread.start() - return + def threadWorker(self): + try: + # create new database handle since we're in our own thread + from spiderfoot import SpiderFootDb + self.setDbh(SpiderFootDb(self.opts)) + + if not (self.incomingEventQueue and self.outgoingEventQueue): + self.log.error("Please set up queues before starting module as thread") + return + + while not self.checkForStop(): + try: + sfEvent = self.incomingEventQueue.get_nowait() + self.log.debug(f"{self.__name__}.threadWorker() got event, {sfEvent.eventType}, from incomingEventQueue.") + self.running = True + self.handleEvent(sfEvent) + self.running = False + except queue.Empty: + sleep(.3) + continue + except KeyboardInterrupt: + self.log.warning(f"Interrupted module {self.__name__}.") + self._stopScanning = True + except Exception as e: + import traceback + self.log.error(f"Exception ({e.__class__.__name__}) in module {self.__name__}." + + traceback.format_exc()) + self.errorState = True + finally: + self.running = False # end of SpiderFootPlugin class